aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes
diff options
context:
space:
mode:
authorMaxim Yurchuk <maxim-yurchuk@ydb.tech>2024-12-12 15:00:43 +0000
committerGitHub <noreply@github.com>2024-12-12 15:00:43 +0000
commit42701242eaf5be980cb935631586d0e90b82641c (patch)
tree6dbf5fcd37d3c16591e196c4a69d166e3ab3a398 /yql/essentials/minikql/comp_nodes
parent7f5a9f394dbd9ac290cabbb7977538656b3a541e (diff)
parentf7c04b5876af3d16849ab5e3079c0eabbd4e3a00 (diff)
downloadydb-42701242eaf5be980cb935631586d0e90b82641c.tar.gz
Merge pull request #12554 from vitalyisaev2/YQ-3839.with_rightlib.3
Import from Arcadia + YDB FQ: turning gateways_config.proto into a file without external dependencies
Diffstat (limited to 'yql/essentials/minikql/comp_nodes')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp976
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_rh_hash.h51
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp2518
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp331
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h113
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp84
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/ya.make.inc1
7 files changed, 2045 insertions, 2029 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index e5b53bee5c..230509757b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -1,9 +1,10 @@
-#include "mkql_map_join.h"
+#include "mkql_block_map_join.h"
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
+#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
#include <yql/essentials/minikql/mkql_program_builder.h>
@@ -23,6 +24,19 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) {
}));
}
+ui64 CalculateTupleHash(const std::vector<ui64>& hashes) {
+ ui64 hash = 0;
+ for (size_t i = 0; i < hashes.size(); i++) {
+ if (!hashes[i]) {
+ return 0;
+ }
+
+ hash = CombineHashes(hash, hashes[i]);
+ }
+
+ return hash;
+}
+
template <bool RightRequired>
class TBlockJoinState : public TBlockState {
public:
@@ -39,10 +53,12 @@ public:
{
const auto& pgBuilder = ctx.Builder->GetPgBuilder();
MaxLength_ = CalcMaxBlockLength(outputItems);
+ TBlockTypeHelper helper;
for (size_t i = 0; i < inputItems.size(); i++) {
- const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
+ TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
+ Hashers_.push_back(helper.MakeHasher(blockItemType));
}
// The last output column (i.e. block length) doesn't require a block builder.
for (size_t i = 0; i < OutputWidth_; i++) {
@@ -91,6 +107,27 @@ public:
OutputRows_++;
}
+ void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) {
+ size_t builderIndex = 0;
+
+ for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
+ AddItem(GetItem(LeftIOMap_[i]), builderIndex);
+ }
+
+ if (!rightColumns.empty()) {
+ Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_);
+ for (size_t i = 0; i < rightColumns.size(); i++) {
+ AddItem(rightColumns[i], builderIndex++);
+ }
+ } else {
+ while (builderIndex < OutputWidth_) {
+ AddItem(TBlockItem(), builderIndex++);
+ }
+ }
+
+ OutputRows_++;
+ }
+
void MakeBlocks(const THolderFactory& holderFactory) {
Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
OutputRows_ = 0;
@@ -102,14 +139,21 @@ public:
FillArrays();
}
- TBlockItem GetItem(size_t idx) const {
+ TBlockItem GetItem(size_t idx, size_t offset = 0) const {
+ Y_ENSURE(Current_ + offset < InputRows_);
const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
if (datum.is_scalar()) {
return Readers_[idx]->GetScalarItem(*datum.scalar());
}
MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[idx]->GetItem(*datum.array(), Current_);
+ return Readers_[idx]->GetItem(*datum.array(), Current_ + offset);
+ }
+
+ std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const {
+ auto item = GetItem(idx, offset);
+ ui64 hash = Hashers_[idx]->Hash(item);
+ return std::make_pair(item, hash);
}
NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
@@ -117,7 +161,7 @@ public:
}
void Reset() {
- Next_ = 0;
+ Current_ = 0;
InputRows_ = GetBlockCount(Inputs_.back());
}
@@ -125,12 +169,8 @@ public:
IsFinished_ = true;
}
- bool NextRow() {
- if (Next_ >= InputRows_) {
- return false;
- }
- Current_ = Next_++;
- return true;
+ void NextRow() {
+ Current_++;
}
bool HasBlocks() {
@@ -150,6 +190,11 @@ public:
return IsFinished_;
}
+ size_t RemainingRowsCount() const {
+ Y_ENSURE(InputRows_ >= Current_);
+ return InputRows_ - Current_;
+ }
+
NUdf::TUnboxedValue* GetRawInputFields() {
return Inputs_.data();
}
@@ -174,7 +219,6 @@ private:
}
size_t Current_ = 0;
- size_t Next_ = 0;
bool IsFinished_ = false;
size_t MaxLength_;
size_t BuilderAllocatedSize_ = 0;
@@ -190,307 +234,649 @@ private:
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
TVector<std::unique_ptr<IArrayBuilder>> Builders_;
+ TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
};
-template <bool WithoutRight, bool RightRequired, bool IsTuple>
-class TBlockWideMapJoinWrapper : public TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>
-{
-using TBaseComputation = TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>;
-using TState = TBlockJoinState<RightRequired>;
-public:
- TBlockWideMapJoinWrapper(TComputationMutables& mutables,
- const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems,
- const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
- IComputationNode* stream, IComputationNode* dict)
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultJoinItems_(std::move(resultJoinItems))
- , LeftStreamItems_(std::move(leftStreamItems))
- , LeftKeyColumns_(std::move(leftKeyColumns))
- , LeftIOMap_(std::move(leftIOMap))
- , Stream_(stream)
- , Dict_(dict)
- , KeyTupleCache_(mutables)
- {}
+class TBlockIndex : public TComputationValue<TBlockIndex> {
+ struct TIndexEntry {
+ ui32 BlockOffset;
+ ui32 ItemOffset;
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- NUdf::TUnboxedValue* items = nullptr;
- const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items);
- const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_,
- LeftIOMap_, ResultJoinItems_);
- return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(state),
- std::move(Stream_->GetValue(ctx)),
- std::move(Dict_->GetValue(ctx)),
- LeftKeyColumns_,
- std::move(keys), items);
- }
+ TIndexEntry() = default;
+ TIndexEntry(ui32 blockOffset, ui32 itemOffset)
+ : BlockOffset(blockOffset)
+ , ItemOffset(itemOffset)
+ {}
+ };
-private:
- class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
+ struct TIndexNode {
+ TIndexEntry Entry;
+ TIndexNode* Next;
+
+ TIndexNode() = delete;
+ TIndexNode(TIndexEntry entry, TIndexNode* next = nullptr)
+ : Entry(entry)
+ , Next(next)
+ {}
+ };
+
+ class TIndexMapValue {
public:
- TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream,
- NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns,
- NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems)
- : TBase(memInfo)
- , BlockState_(blockState)
- , Stream_(stream)
- , Dict_(dict)
- , KeyValue_(keyValue)
- , KeyItems_(keyItems)
- , LeftKeyColumns_(leftKeyColumns)
- , HolderFactory_(holderFactory)
+ TIndexMapValue()
+ : Raw(0)
+ {}
+
+ TIndexMapValue(TIndexEntry entry) {
+ TIndexEntryUnion un;
+ un.Entry = entry;
+
+ Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw);
+ Raw = (un.Raw << 1) | 1;
+ }
+
+ TIndexMapValue(TIndexNode* entryList)
+ : EntryList(entryList)
{}
+ bool IsInplace() const {
+ return Raw & 1;
+ }
+
+ TIndexNode* GetList() const {
+ Y_ENSURE(!IsInplace());
+ return EntryList;
+ }
+
+ TIndexEntry GetEntry() const {
+ Y_ENSURE(IsInplace());
+
+ TIndexEntryUnion un;
+ un.Raw = Raw >> 1;
+ return un.Entry;
+ }
+
private:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
- auto* inputFields = blockState.GetRawInputFields();
- const size_t inputWidth = blockState.GetInputWidth();
- const size_t outputWidth = blockState.GetOutputWidth();
+ union TIndexEntryUnion {
+ TIndexEntry Entry;
+ ui64 Raw;
+ };
+
+ union {
+ TIndexNode* EntryList;
+ ui64 Raw;
+ };
+ };
- MKQL_ENSURE(width == outputWidth,
- "The given width doesn't equal to the result type size");
+ static_assert(sizeof(TIndexMapValue) == 8);
- while (!blockState.HasBlocks()) {
- while (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(blockState);
- if constexpr (WithoutRight) {
- if ((key && Dict_.Contains(key)) == RightRequired) {
- blockState.CopyRow();
- }
- } else if (NUdf::TUnboxedValue lookup; key && (lookup = Dict_.Lookup(key))) {
- blockState.MakeRow(lookup);
- } else if constexpr (!RightRequired) {
- blockState.MakeRow(NUdf::TUnboxedValue());
- }
+ using TBase = TComputationValue<TBlockIndex>;
+ using TIndexMap = TRobinHoodHashFixedMap<
+ ui64,
+ TIndexMapValue,
+ std::equal_to<ui64>,
+ std::hash<ui64>,
+ TMKQLAllocator<char>
+ >;
+
+public:
+ class TIterator {
+ enum class EIteratorType {
+ EMPTY,
+ INPLACE,
+ LIST
+ };
+
+ public:
+ TIterator() = default;
+
+ TIterator(TBlockIndex* blockIndex)
+ : Type_(EIteratorType::EMPTY)
+ , BlockIndex_(blockIndex)
+ {}
+
+ TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::INPLACE)
+ , BlockIndex_(blockIndex)
+ , Entry_(entry)
+ , EntryConsumed_(false)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ : Type_(EIteratorType::LIST)
+ , BlockIndex_(blockIndex)
+ , Node_(node)
+ , ItemsToLookup_(std::move(itemsToLookup))
+ {}
+
+ TIterator(const TIterator&) = delete;
+ TIterator& operator=(const TIterator&) = delete;
+
+ TIterator(TIterator&& other) {
+ *this = std::move(other);
+ }
+
+ TIterator& operator=(TIterator&& other) {
+ if (this != &other) {
+ Type_ = other.Type_;
+ BlockIndex_ = other.BlockIndex_;
+ ItemsToLookup_ = std::move(other.ItemsToLookup_);
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ break;
+
+ case EIteratorType::INPLACE:
+ Entry_ = other.Entry_;
+ EntryConsumed_ = other.EntryConsumed_;
+ break;
+
+ case EIteratorType::LIST:
+ Node_ = other.Node_;
+ break;
}
- if (blockState.IsNotFull() && !blockState.IsFinished()) {
- switch (Stream_.WideFetch(inputFields, inputWidth)) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Ok:
- blockState.Reset();
- continue;
- case NUdf::EFetchStatus::Finish:
- blockState.Finish();
- break;
+
+ other.BlockIndex_ = nullptr;
+ }
+ return *this;
+ }
+
+ TMaybe<TIndexEntry> Next() {
+ Y_ENSURE(IsValid());
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ return Nothing();
+
+ case EIteratorType::INPLACE:
+ if (EntryConsumed_) {
+ return Nothing();
+ }
+
+ EntryConsumed_ = true;
+ return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+
+ case EIteratorType::LIST:
+ for (; Node_ != nullptr; Node_ = Node_->Next) {
+ if (CheckEntry(Node_->Entry)) {
+ auto entry = Node_->Entry;
+ Node_ = Node_->Next;
+ return entry;
}
- // Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
}
- if (blockState.IsEmpty()) {
- return NUdf::EFetchStatus::Finish;
+
+ return Nothing();
+ }
+ }
+
+ bool IsValid() const {
+ return BlockIndex_;
+ }
+
+ bool IsEmpty() const {
+ Y_ENSURE(IsValid());
+
+ switch (Type_) {
+ case EIteratorType::EMPTY:
+ return true;
+ case EIteratorType::INPLACE:
+ return EntryConsumed_;
+ case EIteratorType::LIST:
+ return Node_ == nullptr;
+ }
+ }
+
+ void Reset() {
+ *this = TIterator();
+ }
+
+ private:
+ bool CheckEntry(const TIndexEntry& entry) {
+ for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) {
+ auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]);
+ if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) {
+ return true;
}
- blockState.MakeBlocks(HolderFactory_);
}
- const auto sliceSize = blockState.Slice();
+ return false;
+ }
- for (size_t i = 0; i < outputWidth; i++) {
- output[i] = blockState.Get(sliceSize, HolderFactory_, i);
+ private:
+ EIteratorType Type_;
+ TBlockIndex* BlockIndex_ = nullptr;
+
+ union {
+ TIndexNode* Node_;
+ struct {
+ TIndexEntry Entry_;
+ bool EntryConsumed_;
+ };
+ };
+
+ std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_;
+ };
+
+public:
+ TBlockIndex(
+ TMemoryUsageInfo* memInfo,
+ const TVector<TType*>& itemTypes,
+ const TVector<ui32>& keyColumns,
+ NUdf::TUnboxedValue stream
+ )
+ : TBase(memInfo)
+ , InputsDescr_(ToValueDescr(itemTypes))
+ , KeyColumns_(keyColumns)
+ , Stream_(stream)
+ , Inputs_(itemTypes.size())
+ {
+ TBlockTypeHelper helper;
+ for (size_t i = 0; i < itemTypes.size(); i++) {
+ TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
+ Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ Hashers_.push_back(helper.MakeHasher(blockItemType));
+ Comparators_.push_back(helper.MakeComparator(blockItemType));
+ }
+ }
+
+ NUdf::EFetchStatus FetchStream() {
+ switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ case NUdf::EFetchStatus::Ok:
+ break;
+ }
+
+ std::vector<arrow::Datum> block;
+ for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+ auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+ block.push_back(std::move(datum));
+ }
+
+ auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
+
+ std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
+ std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
+ ui32 insertBatchLen = 0;
+
+ auto processInsertBatch = [&]() {
+ Index_.BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
+ auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ if (isNew) {
+ // Store single entry inplace
+ *value = TIndexMapValue(insertBatchEntries[i]);
+ Index_.CheckGrow();
+ } else {
+ // Store as list
+ if (value->IsInplace()) {
+ *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
+ }
+
+ *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
+ }
+ });
+ };
+
+ Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max());
+ Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
+ for (size_t i = 0; i < blockSize; i++) {
+ ui64 keyHash = CalculateKeyHash(block, i);
+ if (!keyHash) {
+ continue;
}
- return NUdf::EFetchStatus::Ok;
+ insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i);
+ insertBatch[insertBatchLen].ConstructKey(keyHash);
+ insertBatchLen++;
+
+ if (insertBatchLen == PrefetchBatchSize) {
+ processInsertBatch();
+ insertBatchLen = 0;
+ }
+ }
+
+ if (insertBatchLen > 0) {
+ processInsertBatch();
}
- NUdf::TUnboxedValue MakeKeysTuple(const TState& blockState) const {
- // TODO: Handle converters.
- if constexpr (!IsTuple) {
- return blockState.GetValue(HolderFactory_, LeftKeyColumns_.front());
+ Data_.push_back(std::move(block));
+ return NUdf::EFetchStatus::Ok;
+ }
+
+ template<typename TGetKey>
+ void BatchLookup(size_t batchSize, std::array<TBlockIndex::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
+ Y_ENSURE(batchSize <= PrefetchBatchSize);
+
+ std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
+ std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch;
+
+ for (size_t i = 0; i < batchSize; i++) {
+ const auto& [items, keyHash] = getKey(i);
+ lookupBatch[i].ConstructKey(keyHash);
+ itemsBatch[i] = items;
+ }
+
+ Index_.BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
+ if (!iter) {
+ // Empty iterator
+ iterators[i] = TIterator(this);
+ return;
}
- Y_ABORT_IF(KeyItems_ == nullptr);
- for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
- KeyItems_[i] = blockState.GetValue(HolderFactory_, LeftKeyColumns_[i]);
+ auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ if (value->IsInplace()) {
+ iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
+ } else {
+ iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i]));
}
- return KeyValue_;
+ });
+ }
+
+ TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+ Y_ENSURE(columnIdx < Inputs_.size() - 1);
+
+ auto& datum = Data_[entry.BlockOffset][columnIdx];
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset);
+ }
+
+ void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) {
+ Y_ENSURE(row.size() == ioMap.size());
+ for (size_t i = 0; i < row.size(); i++) {
+ row[i] = GetItem(entry, ioMap[i]);
}
+ }
- NUdf::TUnboxedValue BlockState_;
- NUdf::TUnboxedValue Stream_;
- NUdf::TUnboxedValue Dict_;
- NUdf::TUnboxedValue KeyValue_;
- NUdf::TUnboxedValue* KeyItems_;
+private:
+ ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const {
+ ui64 keyHash = 0;
+ for (ui32 keyColumn : KeyColumns_) {
+ auto& datum = block[keyColumn];
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+
+ auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
+ if (!item) {
+ return 0;
+ }
- const TVector<ui32>& LeftKeyColumns_;
- const THolderFactory& HolderFactory_;
- };
+ keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
+ }
+ return keyHash;
+ }
- void RegisterDependencies() const final {
- this->DependsOn(Stream_);
- this->DependsOn(Dict_);
+ TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
+ return &IndexNodes_.emplace_back(entry, currentHead);
}
- const TVector<TType*> ResultJoinItems_;
- const TVector<TType*> LeftStreamItems_;
- const TVector<ui32> LeftKeyColumns_;
- const TVector<ui32> LeftIOMap_;
- IComputationNode* const Stream_;
- IComputationNode* const Dict_;
- const TContainerCacheOnContext KeyTupleCache_;
+private:
+ const std::vector<arrow::ValueDescr> InputsDescr_;
+ const TVector<ui32>& KeyColumns_;
+
+ TVector<std::unique_ptr<IBlockReader>> Readers_;
+ TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
+ TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+
+ std::vector<std::vector<arrow::Datum>> Data_;
+
+ TIndexMap Index_;
+ std::deque<TIndexNode> IndexNodes_;
+
+ NUdf::TUnboxedValue Stream_;
+ TUnboxedValueVector Inputs_;
};
-template<bool RightRequired, bool IsTuple>
-class TBlockWideMultiMapJoinWrapper : public TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>
+template <bool WithoutRight, bool RightRequired, bool RightAny>
+class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>
{
-using TBaseComputation = TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>;
-using TState = TBlockJoinState<RightRequired>;
+using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>;
+using TJoinState = TBlockJoinState<RightRequired>;
+using TIndexState = TBlockIndex;
public:
- TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables,
- const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems,
- const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap,
- IComputationNode* stream, IComputationNode* dict)
+ TBlockMapJoinCoreWraper(
+ TComputationMutables& mutables,
+ const TVector<TType*>&& resultItemTypes,
+ const TVector<TType*>&& leftItemTypes,
+ const TVector<ui32>&& leftKeyColumns,
+ const TVector<ui32>&& leftIOMap,
+ const TVector<TType*>&& rightItemTypes,
+ const TVector<ui32>&& rightKeyColumns,
+ const TVector<ui32>&& rightIOMap,
+ IComputationNode* leftStream,
+ IComputationNode* rightStream
+ )
: TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultJoinItems_(std::move(resultJoinItems))
- , LeftStreamItems_(std::move(leftStreamItems))
+ , ResultItemTypes_(std::move(resultItemTypes))
+ , LeftItemTypes_(std::move(leftItemTypes))
, LeftKeyColumns_(std::move(leftKeyColumns))
, LeftIOMap_(std::move(leftIOMap))
- , Stream_(stream)
- , Dict_(dict)
+ , RightItemTypes_(std::move(rightItemTypes))
+ , RightKeyColumns_(std::move(rightKeyColumns))
+ , RightIOMap_(std::move(rightIOMap))
+ , LeftStream_(std::move(leftStream))
+ , RightStream_(std::move(rightStream))
, KeyTupleCache_(mutables)
{}
NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- NUdf::TUnboxedValue* items = nullptr;
- const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items);
- const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_,
- LeftIOMap_, ResultJoinItems_);
+ const auto joinState = ctx.HolderFactory.Create<TJoinState>(
+ ctx,
+ LeftItemTypes_,
+ LeftIOMap_,
+ ResultItemTypes_
+ );
+ const auto indexState = ctx.HolderFactory.Create<TIndexState>(
+ RightItemTypes_,
+ RightKeyColumns_,
+ std::move(RightStream_->GetValue(ctx))
+ );
+
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(state),
- std::move(Stream_->GetValue(ctx)),
- std::move(Dict_->GetValue(ctx)),
+ std::move(joinState),
+ std::move(indexState),
+ std::move(LeftStream_->GetValue(ctx)),
+ LeftItemTypes_,
LeftKeyColumns_,
- std::move(keys), items);
+ std::move(RightStream_->GetValue(ctx)),
+ RightItemTypes_,
+ RightKeyColumns_,
+ RightIOMap_
+ );
}
private:
class TStreamValue : public TComputationValue<TStreamValue> {
using TBase = TComputationValue<TStreamValue>;
public:
- TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream,
- NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns,
- NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems)
+ TStreamValue(
+ TMemoryUsageInfo* memInfo,
+ const THolderFactory& holderFactory,
+ NUdf::TUnboxedValue&& joinState,
+ NUdf::TUnboxedValue&& indexState,
+ NUdf::TUnboxedValue&& leftStream,
+ const TVector<TType*>& leftTypes,
+ const TVector<ui32>& leftKeyColumns,
+ NUdf::TUnboxedValue&& rightStream,
+ const TVector<TType*>& rightTypes,
+ const TVector<ui32>& rightKeyColumns,
+ const TVector<ui32>& rightIOMap
+ )
: TBase(memInfo)
- , BlockState_(blockState)
- , Stream_(stream)
- , Dict_(dict)
- , KeyValue_(keyValue)
- , KeyItems_(keyItems)
- , List_(NUdf::TUnboxedValue::Invalid())
- , Iterator_(NUdf::TUnboxedValue::Invalid())
- , Current_(NUdf::TUnboxedValue::Invalid())
+ , JoinState_(joinState)
+ , IndexState_(indexState)
+ , LeftStream_(leftStream)
+ , LeftItemTypes_(leftTypes)
, LeftKeyColumns_(leftKeyColumns)
+ , RightStream_(rightStream)
+ , RightItemTypes_(rightTypes)
+ , RightKeyColumns_(rightKeyColumns)
+ , RightIOMap_(rightIOMap)
, HolderFactory_(holderFactory)
{}
private:
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
- auto* inputFields = blockState.GetRawInputFields();
- const size_t inputWidth = blockState.GetInputWidth();
- const size_t outputWidth = blockState.GetOutputWidth();
+ auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
+ auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get());
+
+ if (!RightStreamConsumed_) {
+ auto fetchStatus = NUdf::EFetchStatus::Ok;
+ while (fetchStatus != NUdf::EFetchStatus::Finish) {
+ fetchStatus = indexState.FetchStream();
+ if (fetchStatus == NUdf::EFetchStatus::Yield) {
+ return NUdf::EFetchStatus::Yield;
+ }
+ }
+
+ RightStreamConsumed_ = true;
+ }
+
+ auto* inputFields = joinState.GetRawInputFields();
+ const size_t inputWidth = joinState.GetInputWidth();
+ const size_t outputWidth = joinState.GetOutputWidth();
MKQL_ENSURE(width == outputWidth,
"The given width doesn't equal to the result type size");
- while (!blockState.HasBlocks()) {
- if (!Iterator_.IsInvalid()) {
- // Process the remaining items from the iterator.
- while (blockState.IsNotFull() && Iterator_.Next(Current_)) {
- blockState.MakeRow(Current_);
- }
- }
- if (blockState.IsNotFull() && blockState.NextRow()) {
- const auto key = MakeKeysTuple(blockState);
- // Lookup the item in the right dict. If the lookup succeeds,
- // reset the iterator and proceed the execution from the
- // beginning of the outer loop. Otherwise, the iterator is
- // already invalidated (i.e. finished), so the execution will
- // process the next tuple from the left stream.
- if (key && (List_ = Dict_.Lookup(key))) {
- Iterator_ = List_.GetListIterator();
+ std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size());
+ std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size());
+ std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
+
+ while (!joinState.HasBlocks()) {
+ while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) {
+ auto& iter = LookupBatchIterators_[LookupBatchCurrent_];
+ if constexpr (WithoutRight) {
+ if (bool(iter.IsEmpty()) != RightRequired) {
+ joinState.CopyRow();
+ }
+
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ continue;
} else if constexpr (!RightRequired) {
- blockState.MakeRow(NUdf::TUnboxedValue());
+ if (iter.IsEmpty()) {
+ joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>());
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ continue;
+ }
}
+
+ while (joinState.IsNotFull() && !iter.IsEmpty()) {
+ auto key = iter.Next();
+ indexState.GetRow(*key, RightIOMap_, rightRow);
+ joinState.MakeRow(rightRow);
+
+ if constexpr (RightAny) {
+ break;
+ }
+ }
+
+ if (RightAny || iter.IsEmpty()) {
+ joinState.NextRow();
+ LookupBatchCurrent_++;
+ }
+ }
+
+ if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
+ LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount()));
+ indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) {
+ MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i);
+ ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes);
+ return std::make_pair(std::ref(leftKeyColumns), keyHash);
+ });
+
+ LookupBatchCurrent_ = 0;
continue;
}
- if (blockState.IsNotFull() && !blockState.IsFinished()) {
- switch (Stream_.WideFetch(inputFields, inputWidth)) {
+
+ if (joinState.IsNotFull() && !joinState.IsFinished()) {
+ switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
case NUdf::EFetchStatus::Yield:
return NUdf::EFetchStatus::Yield;
case NUdf::EFetchStatus::Ok:
- blockState.Reset();
+ joinState.Reset();
continue;
case NUdf::EFetchStatus::Finish:
- blockState.Finish();
+ joinState.Finish();
break;
}
// Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(blockState.IsFinished());
+ Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
}
- if (blockState.IsEmpty()) {
+ if (joinState.IsEmpty()) {
return NUdf::EFetchStatus::Finish;
}
- blockState.MakeBlocks(HolderFactory_);
+ joinState.MakeBlocks(HolderFactory_);
}
- const auto sliceSize = blockState.Slice();
+ const auto sliceSize = joinState.Slice();
for (size_t i = 0; i < outputWidth; i++) {
- output[i] = blockState.Get(sliceSize, HolderFactory_, i);
+ output[i] = joinState.Get(sliceSize, HolderFactory_, i);
}
return NUdf::EFetchStatus::Ok;
}
- NUdf::TUnboxedValue MakeKeysTuple(const TState& state) const {
- // TODO: Handle converters.
- if constexpr (!IsTuple) {
- return state.GetValue(HolderFactory_, LeftKeyColumns_.front());
- }
+ void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const {
+ auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- Y_ABORT_IF(KeyItems_ == nullptr);
+ Y_ENSURE(items.size() == LeftKeyColumns_.size());
+ Y_ENSURE(hashes.size() == LeftKeyColumns_.size());
for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
- KeyItems_[i] = state.GetValue(HolderFactory_, LeftKeyColumns_[i]);
+ std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset);
}
- return KeyValue_;
}
- NUdf::TUnboxedValue BlockState_;
- NUdf::TUnboxedValue Stream_;
- NUdf::TUnboxedValue Dict_;
- NUdf::TUnboxedValue KeyValue_;
- NUdf::TUnboxedValue* KeyItems_;
-
- NUdf::TUnboxedValue List_;
- NUdf::TUnboxedValue Iterator_;
- NUdf::TUnboxedValue Current_;
+ NUdf::TUnboxedValue JoinState_;
+ NUdf::TUnboxedValue IndexState_;
+ NUdf::TUnboxedValue LeftStream_;
+ const TVector<TType*>& LeftItemTypes_;
const TVector<ui32>& LeftKeyColumns_;
+
+ NUdf::TUnboxedValue RightStream_;
+ const TVector<TType*>& RightItemTypes_;
+ const TVector<ui32>& RightKeyColumns_;
+ const TVector<ui32>& RightIOMap_;
+ bool RightStreamConsumed_ = false;
+
+ std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_;
+ ui32 LookupBatchCurrent_ = 0;
+ ui32 LookupBatchSize_ = 0;
+
const THolderFactory& HolderFactory_;
};
void RegisterDependencies() const final {
- this->DependsOn(Stream_);
- this->DependsOn(Dict_);
+ this->DependsOn(LeftStream_);
+ this->DependsOn(RightStream_);
}
- const TVector<TType*> ResultJoinItems_;
- const TVector<TType*> LeftStreamItems_;
+private:
+ const TVector<TType*> ResultItemTypes_;
+
+ const TVector<TType*> LeftItemTypes_;
const TVector<ui32> LeftKeyColumns_;
const TVector<ui32> LeftIOMap_;
- IComputationNode* const Stream_;
- IComputationNode* const Dict_;
+
+ const TVector<TType*> RightItemTypes_;
+ const TVector<ui32> RightKeyColumns_;
+ const TVector<ui32> RightIOMap_;
+
+ IComputationNode* const LeftStream_;
+ IComputationNode* const RightStream_;
+
const TContainerCacheOnContext KeyTupleCache_;
};
} // namespace
IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
+ MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
const auto joinType = callable.GetType()->GetReturnType();
MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
@@ -510,16 +896,14 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
- const auto rightDictNode = callable.GetInput(1);
- MKQL_ENSURE(rightDictNode.GetStaticType()->IsDict(),
- "Expected Dict as a right join part");
- const auto rightDictType = AS_TYPE(TDictType, rightDictNode)->GetPayloadType();
- const auto isMulti = rightDictType->IsList();
- const auto rightDictItemType = isMulti
- ? AS_TYPE(TListType, rightDictType)->GetItemType()
- : rightDictType;
- MKQL_ENSURE(rightDictItemType->IsVoid() || rightDictItemType->IsTuple(),
- "Expected Void or Tuple as a right dict item type");
+ const auto rightType = callable.GetInput(1).GetStaticType();
+ MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream");
+ const auto rightStreamType = AS_TYPE(TStreamType, rightType);
+ MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(),
+ "Expected Multi as a right stream item type");
+ const auto rightStreamComponents = GetWideComponents(rightStreamType);
+ MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
+ const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
const auto joinKindNode = callable.GetInput(2);
const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
@@ -527,34 +911,61 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly);
- const auto keyColumnsLiteral = callable.GetInput(3);
- const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
+ const auto leftKeyColumnsLiteral = callable.GetInput(3);
+ const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
TVector<ui32> leftKeyColumns;
- leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount());
- for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
+ leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
+ for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i));
leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
}
- const bool isTupleKey = leftKeyColumns.size() > 1;
+ const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
- const auto keyDropsLiteral = callable.GetInput(4);
- const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral);
+ const auto leftKeyDropsLiteral = callable.GetInput(4);
+ const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
THashSet<ui32> leftKeyDrops;
- leftKeyDrops.reserve(keyDropsTuple->GetValuesCount());
- for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i));
+ leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
+ for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i));
leftKeyDrops.emplace(item->AsValue().Get<ui32>());
}
- const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
for (const auto& drop : leftKeyDrops) {
MKQL_ENSURE(leftKeySet.contains(drop),
"Only key columns has to be specified in drop column set");
+ }
+ const auto rightKeyColumnsLiteral = callable.GetInput(5);
+ const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
+ TVector<ui32> rightKeyColumns;
+ rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
+ for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
+ rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
+ }
+ const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
+
+ const auto rightKeyDropsLiteral = callable.GetInput(6);
+ const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
+ THashSet<ui32> rightKeyDrops;
+ rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
+ for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) {
+ const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i));
+ rightKeyDrops.emplace(item->AsValue().Get<ui32>());
}
- TVector<ui32> leftIOMap;
+ for (const auto& drop : rightKeyDrops) {
+ MKQL_ENSURE(rightKeySet.contains(drop),
+ "Only key columns has to be specified in drop column set");
+ }
+
+ MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
+
+ const auto rightAnyNode = callable.GetInput(7);
+ const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>();
+
// XXX: Mind the last wide item, containing block length.
+ TVector<ui32> leftIOMap;
for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
if (leftKeyDrops.contains(i)) {
continue;
@@ -562,51 +973,70 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo
leftIOMap.push_back(i);
}
- const auto stream = LocateNode(ctx.NodeLocator, callable, 0);
- const auto dict = LocateNode(ctx.NodeLocator, callable, 1);
-
-#define DISPATCH_JOIN(IS_TUPLE) do { \
- switch (joinKind) { \
- case EJoinKind::Inner: \
- if (isMulti) { \
- return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- } \
- return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::Left: \
- if (isMulti) { \
- return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- } \
- return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::LeftSemi: \
- return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- case EJoinKind::LeftOnly: \
- return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \
- std::move(joinItems), std::move(leftStreamItems), \
- std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \
- default: \
- /* TODO: Display the human-readable join kind name. */ \
- MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \
- << static_cast<ui32>(joinKind)); \
- } \
-} while(0)
-
- if (isTupleKey) {
- DISPATCH_JOIN(true);
+ // XXX: Mind the last wide item, containing block length.
+ TVector<ui32> rightIOMap;
+ if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) {
+ for (size_t i = 0; i < rightStreamItems.size() - 1; i++) {
+ if (rightKeyDrops.contains(i)) {
+ continue;
+ }
+ rightIOMap.push_back(i);
+ }
} else {
- DISPATCH_JOIN(false);
+ MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join");
+ }
+
+ const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
+ const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1);
+
+#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \
+ return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \
+ ctx.Mutables, \
+ std::move(joinItems), \
+ std::move(leftStreamItems), \
+ std::move(leftKeyColumns), \
+ std::move(leftIOMap), \
+ std::move(rightStreamItems), \
+ std::move(rightKeyColumns), \
+ std::move(rightIOMap), \
+ leftStream, \
+ rightStream \
+ )
+
+ switch (joinKind) {
+ case EJoinKind::Inner:
+ if (rightAny) {
+ JOIN_WRAPPER(false, true, true);
+ } else {
+ JOIN_WRAPPER(false, true, false);
+ }
+ case EJoinKind::Left:
+ if (rightAny) {
+ JOIN_WRAPPER(false, false, true);
+ } else {
+ JOIN_WRAPPER(false, false, false);
+ }
+ case EJoinKind::LeftSemi:
+ MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
+ if (rightAny) {
+ JOIN_WRAPPER(true, true, true);
+ } else {
+ JOIN_WRAPPER(true, true, false);
+ }
+ case EJoinKind::LeftOnly:
+ MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
+ if (rightAny) {
+ JOIN_WRAPPER(true, false, true);
+ } else {
+ JOIN_WRAPPER(true, false, false);
+ }
+ default:
+ /* TODO: Display the human-readable join kind name. */
+ MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #"
+ << static_cast<ui32>(joinKind));
}
-#undef DISPATCH_JOIN
+#undef JOIN_WRAPPER
}
} // namespace NMiniKQL
diff --git a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
index 1909b2e714..f5c40a6944 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h
@@ -31,7 +31,7 @@ struct TRobinHoodBatchRequestItem {
void ConstructKey(const TKey& key) {
new (KeyStorage) TKey(key);
}
-
+
// intermediate data
ui64 Hash;
char* InitialIterator;
@@ -114,6 +114,14 @@ public:
}
}
+ // returns iterator or nullptr if key is not present
+ Y_FORCE_INLINE char* Lookup(TKey key) {
+ auto hash = HashLocal(key);
+ auto ptr = MakeIterator(hash, Data, CapacityShift);
+ auto ret = LookupImpl(key, hash, Data, DataEnd, ptr);
+ return ret;
+ }
+
template <typename TSink>
Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
while (2 * (Size + batchRequest.size()) >= Capacity) {
@@ -136,6 +144,22 @@ public:
}
}
+ template <typename TSink>
+ Y_NO_INLINE void BatchLookup(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) {
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ r.Hash = HashLocal(r.GetKey());
+ r.InitialIterator = MakeIterator(r.Hash, Data, CapacityShift);
+ NYql::PrefetchForRead(r.InitialIterator);
+ }
+
+ for (size_t i = 0; i < batchRequest.size(); ++i) {
+ auto& r = batchRequest[i];
+ auto iter = LookupImpl(r.GetKey(), r.Hash, Data, DataEnd, r.InitialIterator);
+ sink(i, iter);
+ }
+ }
+
ui64 GetCapacity() const {
return Capacity;
}
@@ -215,7 +239,7 @@ private:
};
Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacityShift) {
- // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
+ // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) >> capacityShift;
char* ptr = data + AsDeriv().GetCellSize() * bucket;
return ptr;
@@ -283,6 +307,29 @@ private:
}
}
+ Y_FORCE_INLINE char* LookupImpl(TKey key, const ui64 hash, char* data, char* dataEnd, char* ptr) {
+ i32 currDistance = 0;
+ for (;;) {
+ auto& pslPtr = GetPSL(ptr);
+ if (pslPtr.Distance < 0 || currDistance > pslPtr.Distance) {
+ return nullptr;
+ }
+
+ if constexpr (CacheHash) {
+ if (pslPtr.Hash == hash && EqualLocal(GetKey(ptr), key)) {
+ return ptr;
+ }
+ } else {
+ if (EqualLocal(GetKey(ptr), key)) {
+ return ptr;
+ }
+ }
+
+ ++currDistance;
+ AdvancePointer(ptr, data, dataEnd);
+ }
+ }
+
Y_NO_INLINE void Grow() {
ui64 growFactor;
if (Capacity < 100'000) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
index 1a7546eeed..f1cb1ea1aa 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp
@@ -1,1985 +1,995 @@
+#include "mkql_block_map_join_ut_utils.h"
#include "mkql_computation_node_ut.h"
-#include <arrow/array/builder_binary.h>
-#include <arrow/array/builder_primitive.h>
-#include <arrow/compute/kernel.h>
-#include <yql/essentials/minikql/computation/mkql_block_builder.h>
-#include <yql/essentials/minikql/computation/mkql_block_impl.h>
-#include <yql/essentials/minikql/computation/mkql_block_reader.h>
-#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
-#include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h>
namespace NKikimr {
namespace NMiniKQL {
namespace {
-const TRuntimeNode MakeSet(TProgramBuilder& pgmBuilder,
- const TVector<const TRuntimeNode>& keys
-) {
- const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
-
- return pgmBuilder.ToHashedDict(keysList, false,
- [&](TRuntimeNode item) {
- return item;
- }, [&](TRuntimeNode) {
- return pgmBuilder.NewVoid();
- });
-}
-
-const TRuntimeNode MakeDict(TProgramBuilder& pgmBuilder,
- const TVector<const TRuntimeNode>& keys,
- const TVector<const TRuntimeNode>& payloads
-) {
- const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front();
- // TODO: Process containers properly. Now just use Zip to pack
- // the data type in a tuple.
- TVector<const TRuntimeNode> wrappedPayloads;
- std::transform(payloads.cbegin(), payloads.cend(), std::back_inserter(wrappedPayloads),
- [&](const auto payload) {
- return pgmBuilder.Zip({payload});
- });
- TVector<const TRuntimeNode> pairsChunks;
- std::transform(wrappedPayloads.cbegin(), wrappedPayloads.cend(), std::back_inserter(pairsChunks),
- [&](const auto payload) {
- return pgmBuilder.Zip({keysList, payload});
- });
- const auto pairsList = pgmBuilder.Extend(pairsChunks);
-
- return pgmBuilder.ToHashedDict(pairsList, payloads.size() > 1,
- [&](TRuntimeNode item) {
- return pgmBuilder.Nth(item, 0);
- }, [&](TRuntimeNode item) {
- return pgmBuilder.Nth(item, 1);
- });
-}
-
-// XXX: Copy-pasted from program builder sources. Adjusted on demand.
-const std::vector<TType*> ValidateBlockStreamType(const TType* streamType) {
- const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType));
- Y_ENSURE(wideComponents.size() > 0, "Expected at least one column");
- std::vector<TType*> items;
- items.reserve(wideComponents.size());
- // XXX: Declare these variables outside the loop body to use for the last
- // item (i.e. block length column) in the assertions below.
- bool isScalar;
- TType* itemType;
- for (const auto& wideComponent : wideComponents) {
- auto blockType = AS_TYPE(TBlockType, wideComponent);
- isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
- itemType = blockType->GetItemType();
- items.push_back(blockType);
- }
-
- Y_ENSURE(isScalar, "Last column should be scalar");
- Y_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64");
- return items;
-}
-
-bool IsOptionalOrNull(const TType* type) {
- return type->IsOptional() || type->IsNull() || type->IsPg();
-}
-
-const TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
- const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
- TRuntimeNode& leftArg, TType* leftTuple, const TRuntimeNode& dictNode
-) {
- // 1. Make left argument node.
- const auto tupleType = AS_TYPE(TTupleType, leftTuple);
- const auto listTupleType = pgmBuilder.NewListType(leftTuple);
- leftArg = pgmBuilder.Arg(listTupleType);
-
- // 2. Make left wide stream node.
- const auto leftWideStream = pgmBuilder.FromFlow(pgmBuilder.ExpandMap(pgmBuilder.ToFlow(leftArg),
+// List<Tuple<...>> -> Stream<Multi<...>>
+TRuntimeNode ToWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode list) {
+ auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list),
[&](TRuntimeNode tupleNode) -> TRuntimeNode::TList {
+ TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType());
TRuntimeNode::TList wide;
wide.reserve(tupleType->GetElementsCount());
for (size_t i = 0; i < tupleType->GetElementsCount(); i++) {
wide.emplace_back(pgmBuilder.Nth(tupleNode, i));
}
return wide;
- }));
-
- // 3. Calculate the resulting join type.
- const auto leftStreamItems = ValidateBlockStreamType(leftWideStream.GetStaticType());
- const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend());
- TVector<TType*> returnJoinItems;
- for (size_t i = 0; i < leftStreamItems.size(); i++) {
- if (leftKeyDropsSet.contains(i)) {
- continue;
- }
- returnJoinItems.push_back(leftStreamItems[i]);
- }
-
- const auto payloadType = AS_TYPE(TDictType, dictNode.GetStaticType())->GetPayloadType();
- const auto payloadItemType = payloadType->IsList()
- ? AS_TYPE(TListType, payloadType)->GetItemType()
- : payloadType;
- if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) {
- // XXX: This is the contract ensured by the expression compiler and
- // optimizers to ease the processing of the dict payload in wide context.
- Y_ENSURE(payloadItemType->IsTuple(), "Dict payload has to be a Tuple");
- const auto payloadItems = AS_TYPE(TTupleType, payloadItemType)->GetElements();
- TVector<TType*> dictBlockItems;
- dictBlockItems.reserve(payloadItems.size());
- for (const auto& payloadItem : payloadItems) {
- MKQL_ENSURE(!payloadItem->IsBlock(), "Dict payload item has to be non-block");
- const auto itemType = joinKind == EJoinKind::Inner ? payloadItem
- : IsOptionalOrNull(payloadItem) ? payloadItem
- : pgmBuilder.NewOptionalType(payloadItem);
- dictBlockItems.emplace_back(pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many));
}
- // Block length column has to be the last column in wide block stream item,
- // so all contents of the dict payload should be appended to the resulting
- // wide type before the block size column.
- const auto blockLenPos = std::prev(returnJoinItems.end());
- returnJoinItems.insert(blockLenPos, dictBlockItems.cbegin(), dictBlockItems.cend());
- } else {
- // XXX: This is the contract ensured by the expression compiler and
- // optimizers for join types that don't require the right (i.e. dict) part.
- Y_ENSURE(payloadItemType->IsVoid(), "Dict payload has to be Void");
- }
- TType* returnJoinType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(returnJoinItems));
+ );
- // 4. Build BlockMapJoinCore node.
- const auto joinNode = pgmBuilder.BlockMapJoinCore(leftWideStream, dictNode, joinKind,
- leftKeyColumns, leftKeyDrops,
- returnJoinType);
-
- // 5. Build the root node with list of tuples.
- const auto joinItems = GetWideComponents(AS_TYPE(TStreamType, joinNode.GetStaticType()));
- const auto resultType = AS_TYPE(TTupleType, pgmBuilder.NewTupleType(joinItems));
+ return pgmBuilder.FromFlow(wideFlow);
+}
- const auto rootNode = pgmBuilder.Collect(pgmBuilder.FromFlow(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(joinNode),
+// Stream<Multi<...>> -> List<Tuple<...>>
+TRuntimeNode FromWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ return pgmBuilder.Collect(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(stream),
[&](TRuntimeNode::TList items) -> TRuntimeNode {
TVector<TRuntimeNode> tupleElements;
- tupleElements.reserve(resultType->GetElementsCount());
- for (size_t i = 0; i < resultType->GetElementsCount(); i++) {
+ tupleElements.reserve(items.size());
+ for (size_t i = 0; i < items.size(); i++) {
tupleElements.emplace_back(items[i]);
}
return pgmBuilder.NewTuple(tupleElements);
- })));
-
- return rootNode;
+ })
+ );
}
-NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
- const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind,
+ TRuntimeNode leftList, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
+ TRuntimeNode rightList, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny
) {
- const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL,
- [](size_t max, const TType* type) {
- return std::max(max, CalcMaxBlockItemSize(type));
- }));
- TVector<std::unique_ptr<IArrayBuilder>> builders;
- std::transform(types.cbegin(), types.cend(), std::back_inserter(builders),
- [&](const auto& type) {
- return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool,
- maxLength, &ctx.Builder->GetPgBuilder());
- });
-
- const auto& holderFactory = ctx.HolderFactory;
- const size_t width = types.size();
- const size_t total = values.GetListLength();
- NUdf::TUnboxedValue iterator = values.GetListIterator();
- NUdf::TUnboxedValue current;
- size_t converted = 0;
- TDefaultListRepresentation listValues;
- while (converted < total) {
- for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) {
- for (size_t j = 0; j < builders.size(); j++) {
- const NUdf::TUnboxedValuePod& item = current.GetElement(j);
- builders[j]->Add(item);
- }
- }
- NUdf::TUnboxedValue* items = nullptr;
- const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
- for (size_t i = 0; i < width; i++) {
- items[i] = holderFactory.CreateArrowBlock(builders[i]->Build(converted >= total));
- }
- items[width] = MakeBlockCount(holderFactory, blockSize);
- listValues = listValues.Append(std::move(tuple));
- }
- return holderFactory.CreateDirectListHolder(std::move(listValues));
-}
+ const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList));
+ const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList));
-NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
- const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
-) {
- TVector<std::unique_ptr<IBlockReader>> readers;
- TVector<std::unique_ptr<IBlockItemConverter>> converters;
- for (const auto& type : types) {
- const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType();
- readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType,
- ctx.Builder->GetPgBuilder()));
+ const auto leftStreamItems = ValidateBlockStreamType(leftStream.GetStaticType());
+ const auto rightStreamItems = ValidateBlockStreamType(rightStream.GetStaticType());
+
+ TVector<TType*> joinReturnItems;
+
+ const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend());
+ for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size
+ if (leftKeyDropsSet.contains(i)) {
+ continue;
+ }
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many));
}
- const auto& holderFactory = ctx.HolderFactory;
- const size_t width = types.size() - 1;
- TDefaultListRepresentation listValues;
- NUdf::TUnboxedValue iterator = values.GetListIterator();
- NUdf::TUnboxedValue current;
- while (iterator.Next(current)) {
- const auto blockLengthValue = current.GetElement(width);
- const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum();
- Y_ENSURE(blockLengthDatum.is_scalar());
- const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value;
- for (size_t i = 0; i < blockLength; i++) {
- NUdf::TUnboxedValue* items = nullptr;
- const auto tuple = holderFactory.CreateDirectArrayHolder(width, items);
- for (size_t j = 0; j < width; j++) {
- const auto arrayValue = current.GetElement(j);
- const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum();
- UNIT_ASSERT(arrayDatum.is_array());
- const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i);
- items[j] = converters[j]->MakeValue(blockItem, holderFactory);
+ if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) {
+ const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend());
+ for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size
+ if (rightKeyDropsSet.contains(i)) {
+ continue;
}
- listValues = listValues.Append(std::move(tuple));
+
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(
+ joinKind == EJoinKind::Inner ? rightStreamItems[i]
+ : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i]
+ : pgmBuilder.NewOptionalType(rightStreamItems[i]),
+ TBlockType::EShape::Many
+ ));
}
}
- return holderFactory.CreateDirectListHolder(std::move(listValues));
-}
-NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
- const TType* leftType, const NUdf::TUnboxedValue& leftListValue,
- const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
- const TRuntimeNode& rightNode, EJoinKind joinKind, size_t blockSize
-) {
- TProgramBuilder& pb = *setup.PgmBuilder;
-
- // 1. Prepare block type for the input produced by the left node.
- Y_ENSURE(leftType->IsList(), "Left node has to be list");
- const auto leftListType = AS_TYPE(TListType, leftType)->GetItemType();
- Y_ENSURE(leftListType->IsTuple(), "List item has to be tuple");
- const auto leftItems = AS_TYPE(TTupleType, leftListType)->GetElements();
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto blockLenType = pb.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
- TVector<TType*> leftBlockItems;
- std::transform(leftItems.cbegin(), leftItems.cend(), std::back_inserter(leftBlockItems),
- [&](const auto& itemType) {
- return pb.NewBlockType(itemType, TBlockType::EShape::Many);
- });
- // XXX: Mind the last block length column.
- leftBlockItems.push_back(blockLenType);
- const auto leftBlockType = pb.NewTupleType(leftBlockItems);
-
- // 2. Build AST with BlockMapJoinCore.
- TRuntimeNode leftArg;
- const auto joinNode = BuildBlockJoin(pb, joinKind, leftKeyColumns, leftKeyDrops,
- leftArg, leftBlockType, rightNode);
-
- // 3. Prepare non-block type for the result of the join node.
- const auto joinBlockType = joinNode.GetStaticType();
- Y_ENSURE(joinBlockType->IsList(), "Join result has to be list");
- const auto joinListType = AS_TYPE(TListType, joinBlockType)->GetItemType();
- Y_ENSURE(joinListType->IsTuple(), "List item has to be tuple");
- const auto joinBlockItems = AS_TYPE(TTupleType, joinListType)->GetElements();
- TVector<TType*> joinItems;
- // XXX: Mind the last block length column.
- std::transform(joinBlockItems.cbegin(), std::prev(joinBlockItems.cend()), std::back_inserter(joinItems),
- [](const auto& blockItemType) {
- const auto& blockType = AS_TYPE(TBlockType, blockItemType);
- Y_ENSURE(blockType->GetShape() == TBlockType::EShape::Many);
- return blockType->GetItemType();
- });
-
- // 4. Build computation graph with BlockMapJoinCore node as a root.
- // Pass the values from the "left node" as the input for the
- // BlockMapJoinCore graph.
- const auto graph = setup.BuildGraph(joinNode, {leftArg.GetNode()});
- const auto& leftBlocks = graph->GetEntryPoint(0, true);
- auto& ctx = graph->GetContext();
- leftBlocks->SetValue(ctx, ToBlocks(ctx, blockSize, leftItems, leftListValue));
- const auto joinValues = FromBlocks(ctx, joinBlockItems, graph->GetValue());
- return joinValues;
-}
+ joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar));
-TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) {
- NUdf::TUnboxedValue current;
- NUdf::TUnboxedValue iterator = list.GetListIterator();
- TVector<NUdf::TUnboxedValue> items;
- while (iterator.Next(current)) {
- items.push_back(current);
- }
- return items;
-}
+ TType* joinReturnType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems));
+ auto joinNode = pgmBuilder.BlockMapJoinCore(
+ leftStream,
+ rightStream,
+ joinKind,
+ leftKeyColumns,
+ leftKeyDrops,
+ rightKeyColumns,
+ rightKeyDrops,
+ rightAny,
+ joinReturnType
+ );
-void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected,
- const NUdf::TUnboxedValue& got
-) {
- const auto itemType = AS_TYPE(TListType, type)->GetItemType();
- const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType);
- const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType);
- // XXX: Stub both keyTypes and isTuple arguments, since
- // ICompare/IEquate are used.
- TKeyTypes keyTypesStub;
- bool isTupleStub = false;
- const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get());
- const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get());
-
- auto expectedItems = ConvertListToVector(expected);
- auto gotItems = ConvertListToVector(got);
- UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size());
- Sort(expectedItems, valueLess);
- Sort(gotItems, valueLess);
- for (size_t i = 0; i < expectedItems.size(); i++) {
- UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i]));
- }
+ return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode));
}
-void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
- const TType* expectedType, const NUdf::TUnboxedValue& expected,
- const TRuntimeNode& rightNode, const TType* leftType,
- const NUdf::TUnboxedValue& leftListValue, const TVector<ui32>& leftKeyColumns,
- const TVector<ui32>& leftKeyDrops = {}
+NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup,
+ TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops,
+ TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny,
+ EJoinKind joinKind, size_t blockSize
) {
- const size_t testSize = leftListValue.GetListLength();
- for (size_t blockSize = 8; blockSize <= testSize; blockSize <<= 1) {
- const auto got = DoTestBlockJoin(setup, leftType, leftListValue,
- leftKeyColumns, leftKeyDrops,
- rightNode, joinKind, blockSize);
- CompareResults(expectedType, expected, got);
- }
-}
+ TProgramBuilder& pb = *setup.PgmBuilder;
-//
-// Auxiliary routines to build list nodes from the given vectors.
-//
-
-struct TTypeMapperBase {
- TProgramBuilder& Pb;
- TType* ItemType;
- auto GetType() { return ItemType; }
-};
-
-template <typename Type>
-struct TTypeMapper: TTypeMapperBase {
- TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {}
- auto GetValue(const Type& value) {
- return Pb.NewDataLiteral<Type>(value);
- }
-};
+ Y_ENSURE(leftType->IsList(), "Left node has to be list");
+ const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType();
+ Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple");
+ TType* leftBlockType = MakeBlockTupleType(pb, leftItemType);
-template <>
-struct TTypeMapper<TString>: TTypeMapperBase {
- TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {}
- auto GetValue(const TString& value) {
- return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value);
- }
-};
-
-template <typename TNested>
-class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> {
- using TBase = TTypeMapper<TNested>;
-public:
- TTypeMapper(TProgramBuilder& pb): TBase(pb) {}
- auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); }
- auto GetValue(const std::optional<TNested>& value) {
- if (value == std::nullopt) {
- return TBase::Pb.NewEmptyOptional(GetType());
- } else {
- return TBase::Pb.NewOptional(TBase::GetValue(*value));
- }
- }
-};
+ Y_ENSURE(rightType->IsList(), "Right node has to be list");
+ const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType();
+ Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple");
+ TType* rightBlockType = MakeBlockTupleType(pb, rightItemType);
-template<typename Type>
-const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
- const TVector<Type>& vector
-) {
- TTypeMapper<Type> mapper(pb);
+ TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType));
+ TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType));
+ const auto joinNode = BuildBlockJoin(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny);
- TRuntimeNode::TList listItems;
- std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems),
- [&](const auto value) {
- return mapper.GetValue(value);
- });
+ const auto joinType = joinNode.GetStaticType();
+ Y_ENSURE(joinType->IsList(), "Join result has to be list");
+ const auto joinItemType = AS_TYPE(TListType, joinType)->GetItemType();
+ Y_ENSURE(joinItemType->IsTuple(), "List item has to be tuple");
- return {pb.NewList(mapper.GetType(), listItems)};
-}
+ const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()});
-template<typename Type, typename... Tail>
-const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
- const TVector<Type>& vector, Tail... vectors
-) {
- const auto frontList = BuildListNodes(pb, vector);
- const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...);
- TVector<const TRuntimeNode> lists;
- lists.reserve(tailLists.size() + 1);
- lists.push_back(frontList.front());;
- for (const auto& list : tailLists) {
- lists.push_back(list);
- }
- return lists;
+ auto& ctx = graph->GetContext();
+ graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue)));
+ graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue)));
+ return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue());
}
-template<typename... TVectors>
-const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples(
- TSetup<false>& setup, TVectors... vectors
+void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind,
+ TType* expectedType, const NUdf::TUnboxedValue& expected,
+ TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns,
+ TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns,
+ const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false
) {
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...);
- const auto tuplesNode = pb.Zip(lists);
- const auto tuplesNodeType = tuplesNode.GetStaticType();
- const auto tuples = setup.BuildGraph(tuplesNode)->GetValue();
- return std::make_pair(tuplesNodeType, tuples);
-}
-
-TVector<TString> GenerateValues(size_t level) {
- constexpr size_t alphaSize = 'Z' - 'A' + 1;
- if (level == 1) {
- TVector<TString> alphabet(alphaSize);
- std::iota(alphabet.begin(), alphabet.end(), 'A');
- return alphabet;
- }
- const auto subValues = GenerateValues(level - 1);
- TVector<TString> values;
- values.reserve(alphaSize * subValues.size());
- for (char ch = 'A'; ch <= 'Z'; ch++) {
- for (const auto& tail : subValues) {
- values.emplace_back(ch + tail);
- }
- }
- return values;
-}
-
-TSet<ui64> GenerateFibonacci(size_t count) {
- TSet<ui64> fibSet;
- ui64 a = 0, b = 1;
- fibSet.insert(a);
- while (count--) {
- a = std::exchange(b, a + b);
- fibSet.insert(b);
+ const size_t testSize = leftListValue.GetListLength();
+ for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) {
+ const auto got = DoTestBlockJoin(setup,
+ leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops,
+ rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny,
+ joinKind, blockSize
+ );
+ CompareResults(expectedType, expected, got);
}
- return fibSet;
}
} // namespace
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) {
-
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) {
constexpr size_t testSize = 1 << 14;
constexpr size_t valueSize = 3;
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ static const TSet<ui64> fibonacci = GenerateFibonacci(testSize);
+ static const TString hugeString(128, '1');
+
+ Y_UNIT_TEST(TestInnerJoin) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestInnerOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
+
+ // 2. Make input for the "right" stream.
const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
- if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
+ Y_UNIT_TEST(TestInnerJoinMulti) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestLeftMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
+
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key * 1001); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
}
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestLeftSemiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinRightAny) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
[](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
[](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
- }
-} // Y_UNIT_TEST_SUITE
-
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinNullKeysTest) {
-
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(key); });
- Y_UNIT_TEST(TestInnerOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!keyInit[i]) {
- continue;
- }
- const auto& found = rightMap.find(*keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ auto found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!keyInit[i]) {
- continue;
- }
- const auto& found = rightMultiMap.find(*keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}, true
+ );
}
- Y_UNIT_TEST(TestLeftOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = keyInit[i] ? rightMap.find(*keyInit[i]) : rightMap.cend();
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
+ expectedRightValue.push_back(found->second);
} else {
- rightExpected.push_back(std::nullopt);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoinMulti) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
+
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
[](const auto key) { return std::to_string(key * 1001); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = keyInit[i] ? rightMultiMap.find(*keyInit[i]) : rightMultiMap.cend();
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ if (begin != end) {
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
} else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftSemiOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftSemiJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+
// 3. Make "expected" data.
TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (keyInit[i] && rightSet.contains(*keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (rightSet.contains(leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnOptionalUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftOnlyJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<std::optional<ui64>> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return *key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[*key]; });
- // 1a. Make some keys NULL
- keyInit[0] = std::nullopt;
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ // Add rows with the same keys
+ std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit));
+
// 3. Make "expected" data.
TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<std::optional<ui64>> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!(keyInit[i] && rightSet.contains(*keyInit[i]))) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!rightSet.contains(leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
-} // Y_UNIT_TEST_SUITE
+ Y_UNIT_TEST(TestKeyTuple) {
+ TSetup<false> setup(GetNodeFactory());
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) {
+ // 1. Make input for the "left" stream.
+ TVector<ui64> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TString hugeString(128, '1');
+ // 2. Make input for the "right" stream.
+ TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<ui64> rightKey2Init;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
+ [](const auto& key) { return key * 1001; });
+ TVector<TString> rightValueInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit),
+ [](const auto& key) { return std::to_string(key); });
- Y_UNIT_TEST(TestInnerOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayloadInit({hugeString});
// 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ TMap<std::tuple<ui64, ui64>, TString> rightMap;
+ for (size_t i = 0; i < rightKey1Init.size(); i++) {
+ const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
+ rightMap[key] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto key = std::make_tuple(leftKeyInit[i], leftSubkeyInit[i]);
+ const auto found = rightMap.find(key);
if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestInnerMultiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayload1Init({"1"});
- TVector<TString> rightPayload2Init({hugeString});
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKey1Init, rightValueInit, rightKey2Init);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
+ leftType, std::move(leftList), {0, 1},
+ rightType, std::move(rightList), {0, 2},
+ {}, {0, 2}
+ );
}
- Y_UNIT_TEST(TestLeftOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinOutputSlicing) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
+ TVector<ui64> leftKeyInit(testSize);
+ std::fill(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return key * 1001; });
+ TVector<TString> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[key]; });
+
+ // 2. Make input for the "right" stream.
+ // Huge string is used to make less rows fit into one block
const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayloadInit({hugeString});
+ TVector<TString> rightValueInit({hugeString});
+
// 3. Make "expected" data.
TMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
+ rightMap[rightKeyInit[i]] = rightValueInit[i];
+ }
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& found = rightMap.find(leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0});
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
+
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestInnerJoinHugeIterator) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- TVector<TString> rightPayload1Init({"1"});
- TVector<TString> rightPayload2Init({hugeString});
+ TVector<ui64> leftKeyInit({1});
+ TVector<ui64> leftSubkeyInit({1001});
+ TVector<TString> leftValueInit({threeLetterValues[1]});
+
+ // 2. Make input for the "right" stream.
+ // Huge string is used to make less rows fit into one block
+ TVector<ui64> rightKeyInit(1 << 16);
+ std::fill(rightKeyInit.begin(), rightKeyInit.end(), 1);
+ TVector<TString> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto& key) { return std::to_string(key); });
+
// 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TMultiMap<ui64, TString> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
+ rightMap.insert({rightKeyInit[i], rightValueInit[i]});
}
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ TVector<ui64> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<TString> expectedValue;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]);
+ for (auto it = begin; it != end; it++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightValue.push_back(it->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0});
- }
- Y_UNIT_TEST(TestLeftSemiOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
- }
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightValue);
- Y_UNIT_TEST(TestLeftOnlyOn1) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::fill(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- TVector<TString> valueInit;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyInit.push_back(i * 1001);
- valueInit.push_back(threeLetterValues[i]);
- }
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit({1});
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0});
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0},
+ {}, {0}
+ );
}
} // Y_UNIT_TEST_SUITE
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinDropKeyColumns) {
-
+Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) {
constexpr size_t testSize = 1 << 14;
constexpr size_t valueSize = 3;
static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ static const TSet<ui64> fibonacci = GenerateFibonacci(testSize);
+
+ Y_UNIT_TEST(TestInnerJoin) {
+ TSetup<false> setup(GetNodeFactory());
- Y_UNIT_TEST(TestInnerOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<TString>> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+ rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TMap<ui64, std::optional<TString>> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
+ if (rightKeyInit[i].has_value()) {
+ rightMap[*rightKeyInit[i]] = rightValueInit[i];
+ }
}
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMap.find(keyInit[i]);
+ TVector<std::optional<ui64>> expectedLeftKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i]) {
+ continue;
+ }
+ const auto& found = rightMap.find(*leftKeyInit[i]);
if (found != rightMap.cend()) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ expectedLeftKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightKey.push_back(found->first);
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedLeftKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestInnerMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<TString>> rightValueInit;
+ std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+ rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TMap<ui64, std::optional<TString>> rightMap;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
+ if (rightKeyInit[i].has_value()) {
+ rightMap[*rightKeyInit[i]] = rightValueInit[i];
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0}, {0});
- }
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey;
+ TVector<std::optional<TString>> expectedRightValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
- Y_UNIT_TEST(TestLeftOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayloadInit;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit),
- [](const auto key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<ui64, TString> rightMap;
- for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMap[rightKeyInit[i]] = rightPayloadInit[i];
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto& found = rightMap.find(keyInit[i]);
+ const auto& found = rightMap.find(*leftKeyInit[i]);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
+ expectedRightKey.push_back(found->first);
+ expectedRightValue.push_back(found->second);
} else {
- rightExpected.push_back(std::nullopt);
+ expectedRightKey.push_back(std::nullopt);
+ expectedRightValue.push_back(std::nullopt);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKeyInit, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue);
+
RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftMultiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestLeftSemiJoin) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- TVector<TString> rightPayload1Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init),
- [](const auto key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init),
- [](const auto key) { return std::to_string(key * 1001); });
- // 3. Make "expected" data.
- TMap<ui64, TVector<TString>> rightMultiMap;
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
+
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
+
+ // 4. Make "expected" data.
+ TSet<ui64> rightSet;
for (size_t i = 0; i < rightKeyInit.size(); i++) {
- rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto& found = rightMultiMap.find(keyInit[i]);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
+ if (rightKeyInit[i].has_value()) {
+ rightSet.insert(*rightKeyInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0}, {0});
- }
-
- Y_UNIT_TEST(TestLeftSemiOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (rightSet.contains(keyInit[i])) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i]) {
+ continue;
+ }
+ if (rightSet.contains(*leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0}, {0});
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnlyOnUint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- // 3. Make "expected" data.
- TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend());
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- if (!rightSet.contains(keyInit[i])) {
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0}, {0});
- }
+ Y_UNIT_TEST(TestLeftOnlyJoin) {
+ TSetup<false> setup(GetNodeFactory());
-} // Y_UNIT_TEST_SUITE
+ // 1. Make input for the "left" stream.
+ TVector<std::optional<ui64>> leftKeyInit(testSize);
+ std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1);
+ TVector<ui64> leftSubkeyInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit),
+ [](const auto key) { return *key * 1001; });
+ TVector<std::optional<TString>> leftValueInit;
+ std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
-Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMultiKeyBasicTest) {
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend());
- constexpr size_t testSize = 1 << 14;
- constexpr size_t valueSize = 3;
- static const TVector<TString> threeLetterValues = GenerateValues(valueSize);
- static const TSet<ui64> fibonacci = GenerateFibonacci(21);
+ // 3. Add some NULLs
+ leftKeyInit[0] = leftKeyInit[2] = std::nullopt;
+ rightKeyInit[2] = rightKeyInit[3] = std::nullopt;
+ leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt;
- Y_UNIT_TEST(TestInnerOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayloadInit;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TString> rightMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMap[key] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMap.find(key);
- if (found != rightMap.cend()) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(found->second);
+ // 4. Make "expected" data.
+ TSet<ui64> rightSet;
+ for (size_t i = 0; i < rightKeyInit.size(); i++) {
+ if (rightKeyInit[i].has_value()) {
+ rightSet.insert(*rightKeyInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMapNode, leftType, leftList, {0, 1});
- }
-
- Y_UNIT_TEST(TestInnerMultiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayload1Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
- [](const auto& key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<TString> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMultiMap.find(key);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
+ TVector<std::optional<ui64>> expectedKey;
+ TVector<ui64> expectedSubkey;
+ TVector<std::optional<TString>> expectedValue;
+ for (size_t i = 0; i < leftKeyInit.size(); i++) {
+ if (!leftKeyInit[i] || !rightSet.contains(*leftKeyInit[i])) {
+ expectedKey.push_back(leftKeyInit[i]);
+ expectedSubkey.push_back(leftSubkeyInit[i]);
+ expectedValue.push_back(leftValueInit[i]);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0, 1});
+
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKeyInit, leftSubkeyInit, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedKey, expectedSubkey, expectedValue);
+
+ RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
+ leftType, std::move(leftList), {0},
+ rightType, std::move(rightList), {0}
+ );
}
- Y_UNIT_TEST(TestLeftOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
+ Y_UNIT_TEST(TestKeyTuple) {
+ TSetup<false> setup(GetNodeFactory());
+
// 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayloadInit;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
+ TVector<std::optional<ui64>> leftKey1Init(testSize);
+ std::iota(leftKey1Init.begin(), leftKey1Init.end(), 1);
+ TVector<std::optional<ui64>> leftKey2Init(testSize);
+ std::iota(leftKey2Init.begin(), leftKey2Init.end(), 1);
+ TVector<TString> leftValueInit;
+ std::transform(leftKey1Init.cbegin(), leftKey1Init.cend(), std::back_inserter(leftValueInit),
+ [](const auto key) { return threeLetterValues[*key]; });
+
+ // 2. Make input for the "right" stream.
+ TVector<std::optional<ui64>> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<std::optional<ui64>> rightKey2Init(fibonacci.cbegin(), fibonacci.cend());
+ TVector<TString> rightValueInit;
+ std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit),
+ [](const auto key) { return std::to_string(*key); });
+
+ // 3. Add some NULLs
+ leftKey1Init[0] = leftKey1Init[1] = std::nullopt;
+ leftKey2Init[1] = std::nullopt;
+ rightKey1Init[1] = rightKey1Init[2] = std::nullopt;
+ rightKey2Init[2] = std::nullopt;
+
+ // 4. Make "expected" data.
TMap<std::tuple<ui64, ui64>, TString> rightMap;
for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMap[key] = rightPayloadInit[i];
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMap.find(key);
+ if (rightKey1Init[i].has_value() && rightKey2Init[i].has_value()) {
+ const auto key = std::make_tuple(*rightKey1Init[i], *rightKey2Init[i]);
+ rightMap[key] = rightValueInit[i];
+ }
+ }
+ TVector<std::optional<ui64>> expectedLeftKey1;
+ TVector<std::optional<ui64>> expectedLeftKey2;
+ TVector<TString> expectedValue;
+ TVector<std::optional<ui64>> expectedRightKey1;
+ TVector<std::optional<ui64>> expectedRightKey2;
+ TVector<TString> expectedRightValue;
+ for (size_t i = 0; i < leftKey1Init.size(); i++) {
+ if (!leftKey1Init[i] || !leftKey2Init[i]) {
+ continue;
+ }
+ const auto key = std::make_tuple(*leftKey1Init[i], *leftKey2Init[i]);
+ const auto& found = rightMap.find(key);
if (found != rightMap.cend()) {
- rightExpected.push_back(found->second);
- } else {
- rightExpected.push_back(std::nullopt);
+ expectedLeftKey1.push_back(leftKey1Init[i]);
+ expectedLeftKey2.push_back(leftKey2Init[i]);
+ expectedValue.push_back(leftValueInit[i]);
+ expectedRightKey1.push_back(std::get<0>(found->first));
+ expectedRightKey2.push_back(std::get<1>(found->first));
+ expectedRightValue.push_back(found->second);
}
}
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit);
- const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMapNode, leftType, leftList, {0, 1});
- }
- Y_UNIT_TEST(TestLeftMultiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- TVector<TString> rightPayload1Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init),
- [](const auto& key) { return std::to_string(key); });
- TVector<TString> rightPayload2Init;
- std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init),
- [](const auto& key) { return std::to_string(key); });
- // 3. Make "expected" data.
- TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]);
- rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]};
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- TVector<std::optional<TString>> rightExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- const auto found = rightMultiMap.find(key);
- if (found != rightMultiMap.cend()) {
- for (const auto& right : found->second) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(right);
- }
- } else {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- rightExpected.push_back(std::nullopt);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected, rightExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init);
- const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected,
- rightMultiMapNode, leftType, leftList, {0, 1});
- }
-
- Y_UNIT_TEST(TestLeftSemiOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- // 3. Make "expected" data.
- TSet<std::tuple<ui64, ui64>> rightSet;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- if (rightSet.contains(key)) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected,
- rightSetNode, leftType, leftList, {0, 1});
- }
+ auto [leftType, leftList] = ConvertVectorsToTuples(setup,
+ leftKey1Init, leftKey2Init, leftValueInit);
+ auto [rightType, rightList] = ConvertVectorsToTuples(setup,
+ rightKey1Init, rightKey2Init, rightValueInit);
+ auto [expectedType, expected] = ConvertVectorsToTuples(setup,
+ expectedLeftKey1, expectedLeftKey2, expectedValue, expectedRightKey1, expectedRightKey2, expectedRightValue);
- Y_UNIT_TEST(TestLeftOnlyOnUint64Uint64) {
- TSetup<false> setup;
- TProgramBuilder& pgmBuilder = *setup.PgmBuilder;
- // 1. Make input for the "left" stream.
- TVector<ui64> keyInit(testSize);
- std::iota(keyInit.begin(), keyInit.end(), 1);
- TVector<ui64> subkeyInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit),
- [](const auto key) { return key * 1001; });
- TVector<TString> valueInit;
- std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit),
- [](const auto key) { return threeLetterValues[key]; });
- // 2. Make input for the "right" dict.
- TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend());
- TVector<ui64> rightKey2Init;
- std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init),
- [](const auto& key) { return key * 1001; });
- // 3. Make "expected" data.
- TSet<std::tuple<ui64, ui64>> rightSet;
- for (size_t i = 0; i < rightKey1Init.size(); i++) {
- rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i]));
- }
- TVector<ui64> keyExpected;
- TVector<ui64> subkeyExpected;
- TVector<TString> valueExpected;
- for (size_t i = 0; i < keyInit.size(); i++) {
- const auto key = std::make_tuple(keyInit[i], subkeyInit[i]);
- if (!rightSet.contains(key)) {
- keyExpected.push_back(keyInit[i]);
- subkeyExpected.push_back(subkeyInit[i]);
- valueExpected.push_back(valueInit[i]);
- }
- }
- // 4. Convert input and expected TVectors to List<UV>.
- const auto [leftType, leftList] = ConvertVectorsToTuples(setup,
- keyInit, subkeyInit, valueInit);
- const auto [expectedType, expected] = ConvertVectorsToTuples(setup,
- keyExpected, subkeyExpected, valueExpected);
- // 5. Build "right" computation node.
- const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init);
- const auto rightSetNode = MakeSet(pgmBuilder, rightKeys);
- // 6. Run tests.
- RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected,
- rightSetNode, leftType, leftList, {0, 1});
+ RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected,
+ leftType, std::move(leftList), {0, 1},
+ rightType, std::move(rightList), {0, 1}
+ );
}
} // Y_UNIT_TEST_SUITE
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
new file mode 100644
index 0000000000..2a65da92ed
--- /dev/null
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp
@@ -0,0 +1,331 @@
+#include "mkql_block_map_join_ut_utils.h"
+
+#include <yql/essentials/minikql/computation/mkql_block_impl.h>
+#include <yql/essentials/minikql/computation/mkql_block_reader.h>
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/public/udf/arrow/args_dechunker.h>
+#include <yql/essentials/public/udf/arrow/block_builder.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+namespace {
+
+class TWideStreamThrottlerWrapper: public TMutableComputationNode<TWideStreamThrottlerWrapper> {
+ typedef TMutableComputationNode<TWideStreamThrottlerWrapper> TBaseComputation;
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream)
+ : TBase(memInfo)
+ , OrigStream_(std::move(origStream))
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ if (Counter_++ % 3) {
+ return NUdf::EFetchStatus::Yield;
+ }
+
+ TUnboxedValueVector items(width);
+ switch (OrigStream_.WideFetch(items.data(), width)) {
+ case NUdf::EFetchStatus::Yield:
+ return NUdf::EFetchStatus::Yield;
+ case NUdf::EFetchStatus::Ok:
+ for (size_t i = 0; i < width; i++) {
+ output[i] = std::move(items[i]);
+ }
+ return NUdf::EFetchStatus::Ok;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+
+ private:
+ NUdf::TUnboxedValue OrigStream_;
+ size_t Counter_ = 0;
+ };
+
+ TWideStreamThrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream)
+ : TBaseComputation(mutables)
+ , OrigStream_(origStream)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(OrigStream_);
+ }
+
+private:
+ IComputationNode* OrigStream_;
+};
+
+class TWideStreamDethrottlerWrapper: public TMutableComputationNode<TWideStreamDethrottlerWrapper> {
+ typedef TMutableComputationNode<TWideStreamDethrottlerWrapper> TBaseComputation;
+public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream)
+ : TBase(memInfo)
+ , OrigStream_(std::move(origStream))
+ {
+ }
+
+ private:
+ NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
+ TUnboxedValueVector items(width);
+ for (;;) {
+ switch (OrigStream_.WideFetch(items.data(), width)) {
+ case NUdf::EFetchStatus::Yield:
+ continue;
+ case NUdf::EFetchStatus::Ok:
+ for (size_t i = 0; i < width; i++) {
+ output[i] = std::move(items[i]);
+ }
+ return NUdf::EFetchStatus::Ok;
+ case NUdf::EFetchStatus::Finish:
+ return NUdf::EFetchStatus::Finish;
+ }
+ }
+ }
+
+ private:
+ NUdf::TUnboxedValue OrigStream_;
+ };
+
+ TWideStreamDethrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream)
+ : TBaseComputation(mutables)
+ , OrigStream_(origStream)
+ {
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx));
+ }
+
+private:
+ void RegisterDependencies() const final {
+ DependsOn(OrigStream_);
+ }
+
+private:
+ IComputationNode* OrigStream_;
+};
+
+IComputationNode* WrapWideStreamThrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
+ const auto origStream = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TWideStreamThrottlerWrapper(ctx.Mutables, origStream);
+}
+
+IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
+ const auto origStream = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TWideStreamDethrottlerWrapper(ctx.Mutables, origStream);
+}
+
+}
+
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) {
+ const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements();
+ const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar);
+
+ TVector<TType*> blockItemTypes;
+ std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes),
+ [&](const auto& itemType) {
+ return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many);
+ });
+ // XXX: Mind the last block length column.
+ blockItemTypes.push_back(blockLenType);
+
+ return pgmBuilder.NewTupleType(blockItemTypes);
+}
+
+NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+ const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL,
+ [](size_t max, const TType* type) {
+ return std::max(max, CalcMaxBlockItemSize(type));
+ }));
+ TVector<std::unique_ptr<NUdf::IArrayBuilder>> builders;
+ std::transform(types.cbegin(), types.cend(), std::back_inserter(builders),
+ [&](const auto& type) {
+ return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool,
+ maxLength, &ctx.Builder->GetPgBuilder());
+ });
+
+ const auto& holderFactory = ctx.HolderFactory;
+ const size_t width = types.size();
+ const size_t total = values.GetListLength();
+ NUdf::TUnboxedValue iterator = values.GetListIterator();
+ NUdf::TUnboxedValue current;
+ size_t converted = 0;
+ TDefaultListRepresentation listValues;
+ while (converted < total) {
+ for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) {
+ for (size_t j = 0; j < builders.size(); j++) {
+ const NUdf::TUnboxedValuePod& item = current.GetElement(j);
+ builders[j]->Add(item);
+ }
+ }
+ std::vector<arrow::Datum> batch;
+ batch.reserve(width);
+ for (size_t i = 0; i < width; i++) {
+ batch.emplace_back(builders[i]->Build(converted >= total));
+ }
+
+ NUdf::TArgsDechunker dechunker(std::move(batch));
+ std::vector<arrow::Datum> chunk;
+ ui64 chunkLen = 0;
+ while (dechunker.Next(chunk, chunkLen)) {
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items);
+ for (size_t i = 0; i < width; i++) {
+ items[i] = holderFactory.CreateArrowBlock(std::move(chunk[i]));
+ }
+ items[width] = MakeBlockCount(holderFactory, chunkLen);
+
+ listValues = listValues.Append(std::move(tuple));
+ }
+ }
+ return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
+NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values
+) {
+ TVector<std::unique_ptr<IBlockReader>> readers;
+ TVector<std::unique_ptr<IBlockItemConverter>> converters;
+ for (const auto& type : types) {
+ const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType();
+ readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
+ converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType,
+ ctx.Builder->GetPgBuilder()));
+ }
+
+ const auto& holderFactory = ctx.HolderFactory;
+ const size_t width = types.size() - 1;
+ TDefaultListRepresentation listValues;
+ NUdf::TUnboxedValue iterator = values.GetListIterator();
+ NUdf::TUnboxedValue current;
+ while (iterator.Next(current)) {
+ const auto blockLengthValue = current.GetElement(width);
+ const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum();
+ Y_ENSURE(blockLengthDatum.is_scalar());
+ const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value;
+ for (size_t i = 0; i < blockLength; i++) {
+ NUdf::TUnboxedValue* items = nullptr;
+ const auto tuple = holderFactory.CreateDirectArrayHolder(width, items);
+ for (size_t j = 0; j < width; j++) {
+ const auto arrayValue = current.GetElement(j);
+ const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum();
+ UNIT_ASSERT(arrayDatum.is_array());
+ const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i);
+ items[j] = converters[j]->MakeValue(blockItem, holderFactory);
+ }
+ listValues = listValues.Append(std::move(tuple));
+ }
+ }
+ return holderFactory.CreateDirectListHolder(std::move(listValues));
+}
+
+TComputationNodeFactory GetNodeFactory() {
+ return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ if (callable.GetType()->GetName() == "WideStreamThrottler") {
+ return WrapWideStreamThrottler(callable, ctx);
+ } else if (callable.GetType()->GetName() == "WideStreamDethrottler") {
+ return WrapWideStreamDethrottler(callable, ctx);
+ }
+ return GetBuiltinFactory()(callable, ctx);
+ };
+}
+
+TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamThrottler", stream.GetStaticType());
+ callableBuilder.Add(stream);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) {
+ TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamDethrottler", stream.GetStaticType());
+ callableBuilder.Add(stream);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) {
+ NUdf::TUnboxedValue current;
+ NUdf::TUnboxedValue iterator = list.GetListIterator();
+ TVector<NUdf::TUnboxedValue> items;
+ while (iterator.Next(current)) {
+ items.push_back(current);
+ }
+ return items;
+}
+
+void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected,
+ const NUdf::TUnboxedValue& got
+) {
+ const auto itemType = AS_TYPE(TListType, type)->GetItemType();
+ const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType);
+ const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType);
+ // XXX: Stub both keyTypes and isTuple arguments, since
+ // ICompare/IEquate are used.
+ TKeyTypes keyTypesStub;
+ bool isTupleStub = false;
+ const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get());
+ const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get());
+
+ auto expectedItems = ConvertListToVector(expected);
+ auto gotItems = ConvertListToVector(got);
+ UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size());
+ Sort(expectedItems, valueLess);
+ Sort(gotItems, valueLess);
+ for (size_t i = 0; i < expectedItems.size(); i++) {
+ UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i]));
+ }
+}
+
+TVector<TString> GenerateValues(size_t level) {
+ constexpr size_t alphaSize = 'Z' - 'A' + 1;
+ if (level == 1) {
+ TVector<TString> alphabet(alphaSize);
+ std::iota(alphabet.begin(), alphabet.end(), 'A');
+ return alphabet;
+ }
+ const auto subValues = GenerateValues(level - 1);
+ TVector<TString> values;
+ values.reserve(alphaSize * subValues.size());
+ for (char ch = 'A'; ch <= 'Z'; ch++) {
+ for (const auto& tail : subValues) {
+ values.emplace_back(ch + tail);
+ }
+ }
+ return values;
+}
+
+TSet<ui64> GenerateFibonacci(size_t count) {
+ TSet<ui64> fibSet;
+ ui64 a = 0, b = 1;
+ fibSet.insert(a);
+ while (count--) {
+ a = std::exchange(b, a + b);
+ fibSet.insert(b);
+ }
+ return fibSet;
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
new file mode 100644
index 0000000000..2c2f32b312
--- /dev/null
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h
@@ -0,0 +1,113 @@
+#include "mkql_computation_node_ut.h"
+
+#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+inline bool IsOptionalOrNull(const TType* type) {
+ return type->IsOptional() || type->IsNull() || type->IsPg();
+}
+
+TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType);
+
+NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx,
+ const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values);
+
+TComputationNodeFactory GetNodeFactory();
+TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream);
+TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream);
+
+TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list);
+void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected, const NUdf::TUnboxedValue& got);
+
+TVector<TString> GenerateValues(size_t level);
+TSet<ui64> GenerateFibonacci(size_t count);
+
+//
+// Auxiliary routines to build list nodes from the given vectors.
+//
+
+struct TTypeMapperBase {
+ TProgramBuilder& Pb;
+ TType* ItemType;
+ auto GetType() { return ItemType; }
+};
+
+template <typename Type>
+struct TTypeMapper: TTypeMapperBase {
+ TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {}
+ auto GetValue(const Type& value) {
+ return Pb.NewDataLiteral<Type>(value);
+ }
+};
+
+template <>
+struct TTypeMapper<TString>: TTypeMapperBase {
+ TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {}
+ auto GetValue(const TString& value) {
+ return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value);
+ }
+};
+
+template <typename TNested>
+class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> {
+ using TBase = TTypeMapper<TNested>;
+public:
+ TTypeMapper(TProgramBuilder& pb): TBase(pb) {}
+ auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); }
+ auto GetValue(const std::optional<TNested>& value) {
+ if (value == std::nullopt) {
+ return TBase::Pb.NewEmptyOptional(GetType());
+ } else {
+ return TBase::Pb.NewOptional(TBase::GetValue(*value));
+ }
+ }
+};
+
+template<typename Type>
+const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
+ const TVector<Type>& vector
+) {
+ TTypeMapper<Type> mapper(pb);
+
+ TRuntimeNode::TList listItems;
+ std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems),
+ [&](const auto value) {
+ return mapper.GetValue(value);
+ });
+
+ return {pb.NewList(mapper.GetType(), listItems)};
+}
+
+template<typename Type, typename... Tail>
+const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb,
+ const TVector<Type>& vector, Tail... vectors
+) {
+ const auto frontList = BuildListNodes(pb, vector);
+ const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...);
+ TVector<const TRuntimeNode> lists;
+ lists.reserve(tailLists.size() + 1);
+ lists.push_back(frontList.front());;
+ for (const auto& list : tailLists) {
+ lists.push_back(list);
+ }
+ return lists;
+}
+
+template<typename... TVectors>
+const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples(
+ TSetup<false>& setup, TVectors... vectors
+) {
+ TProgramBuilder& pb = *setup.PgmBuilder;
+ const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...);
+ const auto tuplesNode = pb.Zip(lists);
+ const auto tuplesNodeType = tuplesNode.GetStaticType();
+ const auto tuples = setup.BuildGraph(tuplesNode)->GetValue();
+ return std::make_pair(tuplesNodeType, tuples);
+}
+
+} // namespace NMiniKQL
+} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
index c5f3c53ed4..bf68e92ea4 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp
@@ -375,6 +375,90 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) {
Cerr << "maxDistance2: " << maxDistance2 << "\n";
UNIT_ASSERT(maxDistance2 < 20);
}
+
+ Y_UNIT_TEST(Lookup) {
+ TRobinHoodHashMap<i32> rh(sizeof(i64));
+ std::unordered_map<i32, i64> h;
+ for (ui64 i = 0; i < 10000; ++i) {
+ auto k = i % 1000;
+ auto [it, inserted] = h.emplace(k, 0);
+ bool isNew;
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
+ UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
+ it->second += i;
+ if (isNew) {
+ *(i64*)rh.GetMutablePayload(iter) = i;
+ rh.CheckGrow();
+ } else {
+ *(i64*)rh.GetMutablePayload(iter) += i;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize());
+ }
+
+ for (ui64 i = 0; i < 1000; ++i) {
+ auto iter = rh.Lookup(i);
+ auto hit = h.find(i);
+ UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second);
+ h.erase(hit);
+ }
+
+ UNIT_ASSERT(h.empty());
+ }
+
+ Y_UNIT_TEST(BatchLookup) {
+ using THashTable = TRobinHoodHashMap<i32>;
+ THashTable rh(sizeof(i64));
+ std::unordered_map<i32, i64> h;
+ for (ui64 i = 0; i < 10000; ++i) {
+ auto k = i % 1000;
+ auto [it, inserted] = h.emplace(k, 0);
+ bool isNew;
+ auto iter = rh.Insert(k, isNew);
+ UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k);
+ UNIT_ASSERT_VALUES_EQUAL(isNew, inserted);
+ it->second += i;
+ if (isNew) {
+ *(i64*)rh.GetMutablePayload(iter) = i;
+ rh.CheckGrow();
+ } else {
+ *(i64*)rh.GetMutablePayload(iter) += i;
+ }
+
+ UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize());
+ }
+
+ std::array<TRobinHoodBatchRequestItem<i32>, PrefetchBatchSize> batch;
+ std::array<ui64, PrefetchBatchSize> batchI;
+ ui32 batchLen = 0;
+
+ auto processBatch = [&]() {
+ rh.BatchLookup({batch.data(), batchLen}, [&](size_t i, THashTable::iterator iter) {
+ auto key = batchI[i];
+ auto hit = h.find(key);
+ UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second);
+ h.erase(hit);
+ });
+ };
+
+ for (ui64 i = 0; i < 1000; ++i) {
+ if (batchLen == batch.size()) {
+ processBatch();
+ batchLen = 0;
+ }
+
+ batch[batchLen].ConstructKey(i);
+ batchI[batchLen] = i;
+ ++batchLen;
+ }
+
+ if (batchLen > 0) {
+ processBatch();
+ }
+
+ UNIT_ASSERT(h.empty());
+ }
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
index fc600ad3c4..46a1ad7f11 100644
--- a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
+++ b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc
@@ -24,6 +24,7 @@ SET(ORIG_SOURCES
mkql_block_compress_ut.cpp
mkql_block_exists_ut.cpp
mkql_block_skiptake_ut.cpp
+ mkql_block_map_join_ut_utils.cpp
mkql_block_map_join_ut.cpp
mkql_block_top_sort_ut.cpp
mkql_blocks_ut.cpp