diff options
author | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 11:35:14 +0300 |
---|---|---|
committer | zverevgeny <zverevgeny@ydb.tech> | 2023-09-19 11:56:56 +0300 |
commit | 1176941d342dc85b5c1cba12648cb0d568c5b533 (patch) | |
tree | 5a6147449c77ce3545fef9a2832a7c8212ce4ad8 | |
parent | 840f2b94194666359166ff9da34ec237aeded9b5 (diff) | |
download | ydb-1176941d342dc85b5c1cba12648cb0d568c5b533.tar.gz |
YQL-16443 sparse list for MATCH_RECOGNIZE on streams
10 files changed, 573 insertions, 108 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp index 244d23c37b..dd82f81302 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize.cpp @@ -1,3 +1,4 @@ +#include "mkql_match_recognize_list.h" #include "mkql_match_recognize_matched_vars.h" #include "mkql_match_recognize_measure_arg.h" #include <ydb/library/yql/core/sql_types/match_recognize.h> @@ -41,6 +42,9 @@ struct TMatchRecognizeProcessorParameters { }; class TBackTrackingMatchRecognize: public IProcessMatchRecognize { + using TPartitionList = TSimpleList; + using TRange = TPartitionList::TRange; + using TMatchedVars = TMatchedVars<TRange>; public: TBackTrackingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, @@ -57,7 +61,7 @@ public: bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override { Y_UNUSED(ctx); - Rows.push_back(std::move(row)); + Rows.Append(std::move(row)); return false; } NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { @@ -89,14 +93,14 @@ public: bool ProcessEndOfData(TComputationContext& ctx) override { //Assume, that data moved to IComputationExternalNode node, will not be modified or released //till the end of the current function - auto rowsSize = Rows.size(); - Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.VectorAsVectorHolder(std::move(Rows))); + auto rowsSize = Rows.Size(); + Parameters.InputDataArg->SetValue(ctx, ctx.HolderFactory.Create<TListValue<TPartitionList>>(Rows)); for (size_t i = 0; i != rowsSize; ++i) { Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(static_cast<ui64>(i))); for (size_t v = 0; v != Parameters.Defines.size(); ++v) { const auto &d = Parameters.Defines[v]->GetValue(ctx); if (d && d.GetOptionalValue().Get<bool>()) { - Extend(CurMatchedVars[v], i); + Extend(CurMatchedVars[v], TRange{i}); } } //for the sake of dummy usage assume non-overlapped matches at every 5th row of any partition @@ -113,13 +117,16 @@ private: const NUdf::TUnboxedValue PartitionKey; const TMatchRecognizeProcessorParameters& Parameters; const TContainerCacheOnContext& Cache; - TUnboxedValueVector Rows; + TSimpleList Rows; TMatchedVars CurMatchedVars; std::deque<TMatchedVars> Matches; ui64 MatchNumber; }; class TStreamingMatchRecognize: public IProcessMatchRecognize { + using TPartitionList = TSparseList; + using TRange = TPartitionList::TRange; + using TMatchedVars = TMatchedVars<TRange>; public: TStreamingMatchRecognize( NUdf::TUnboxedValue&& partitionKey, @@ -131,20 +138,19 @@ public: , Cache(cache) , MatchedVars(parameters.Defines.size()) , HasMatch(false) - , RowCount(0) { } bool ProcessInputRow(NUdf::TUnboxedValue&& row, TComputationContext& ctx) override{ Y_UNUSED(row); - Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(RowCount)); + Parameters.CurrentRowIndexArg->SetValue(ctx, NUdf::TUnboxedValuePod(Rows.Size())); + auto r = Rows.Append(std::move(row)); for (size_t i = 0; i != Parameters.Defines.size(); ++i) { const auto& d = Parameters.Defines[i]->GetValue(ctx); if (d && d.GetOptionalValue().Get<bool>()) { - Extend(MatchedVars[i], RowCount); + Extend(MatchedVars[i], r); } } - ++RowCount; return HasMatch; } NUdf::TUnboxedValue GetOutputIfReady(TComputationContext& ctx) override { @@ -177,7 +183,7 @@ private: const TContainerCacheOnContext& Cache; TMatchedVars MatchedVars; bool HasMatch; - size_t RowCount; + TSparseList Rows; }; class TStateForNonInterleavedPartitions diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_list.h b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_list.h new file mode 100644 index 0000000000..749cbb8443 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_list.h @@ -0,0 +1,335 @@ +#pragma once +#include <ydb/library/yql/minikql/defs.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/public/udf/udf_value.h> +#include <unordered_map> + +namespace NKikimr::NMiniKQL::NMatchRecognize { + +class TSimpleList { +public: + ///Range that includes starting and ending points + ///Can not be empty + class TRange { + public: + TRange() + : FromIndex(-1) + , ToIndex(-1) + { + } + + explicit TRange(ui64 index) + : FromIndex(index) + , ToIndex(index) + { + } + + TRange(ui64 from, ui64 to) + : FromIndex(from) + , ToIndex(to) + { + MKQL_ENSURE(FromIndex <= ToIndex, "Internal logic error"); + } + + bool IsValid() const { + return true; + } + + size_t From() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return FromIndex; + } + + size_t To() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return ToIndex; + } + + size_t Size() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return ToIndex - FromIndex + 1; + } + + void Extend() { + MKQL_ENSURE(IsValid(), "Internal logic error"); + ++ToIndex; + } + + private: + ui64 FromIndex; + ui64 ToIndex; + }; + + TRange Append(NUdf::TUnboxedValue&& value) { + TRange result(Rows.size()); + Rows.push_back(std::move(value)); + return result; + } + + size_t Size() const { + return Rows.size(); + } + + bool Empty() const { + return Size() == 0; + } + + NUdf::TUnboxedValue Get(size_t i) const { + return Rows.at(i); + } +private: + TUnboxedValueVector Rows; +}; + +///Stores only locked items +///Locks are holds by TRange +///When all locks on an item are released, the item is removed from the list +class TSparseList { + struct TItem { + NUdf::TUnboxedValue Value; + size_t LockCount = 0; + }; + + class TContainer: public TSimpleRefCount<TContainer> { + public: + using TPtr = TIntrusivePtr<TContainer>; + + void Add(size_t index, NUdf::TUnboxedValue&& value) { + const auto& [iter, newOne] = Storage.emplace(index, TItem{std::move(value), 1}); + MKQL_ENSURE(newOne, "Internal logic error"); + } + + size_t Size() const { + return Storage.size(); + } + + NUdf::TUnboxedValue Get(size_t i) const { + if (const auto it = Storage.find(i); it != Storage.cend()) { + return it->second.Value; + } else { + return NUdf::TUnboxedValue{}; + } + } + + void LockRange(size_t from, size_t to) { + for (auto i = from; i <= to; ++i) { + const auto it = Storage.find(i); + MKQL_ENSURE(it != Storage.cend(), "Internal logic error"); + ++it->second.LockCount; + } + } + + void UnlockRange(size_t from, size_t to) { + for (auto i = from; i <= to; ++i) { + const auto it = Storage.find(i); + MKQL_ENSURE(it != Storage.cend(), "Internal logic error"); + auto lockCount = --it->second.LockCount; + if (0 == lockCount) { + Storage.erase(it); + } + } + } + + private: + //TODO consider to replace hash table with contiguous chunks + using TAllocator = TMKQLAllocator<std::pair<const size_t, TItem>, EMemorySubPool::Temporary>; + std::unordered_map< + size_t, + TItem, + std::hash<size_t>, + std::equal_to<size_t>, + TAllocator> Storage; + }; + using TContainerPtr = TContainer::TPtr; + +public: + ///Range that includes starting and ending points + ///Holds a lock on items in the list + ///Can not be empty, but can be in invalid state, with no container set + class TRange{ + friend class TSparseList; + public: + TRange() + : Container() + , FromIndex(-1) + , ToIndex(-1) + { + } + + TRange(const TRange& other) + : Container(other.Container) + , FromIndex(other.FromIndex) + , ToIndex(other.ToIndex) + { + LockRange(FromIndex, ToIndex); + } + + TRange(TRange&& other) + : Container(other.Container) + , FromIndex(other.FromIndex) + , ToIndex(other.ToIndex) + { + other.Reset(); + } + + ~TRange() { + Release(); + } + + TRange& operator=(const TRange& other) { + if (&other == this) { + return *this; + } + //TODO(zverevgeny): optimize for overlapped source and destination + Release(); + Container = other.Container; + FromIndex = other.FromIndex; + ToIndex = other.ToIndex; + LockRange(FromIndex, ToIndex); + return *this; + } + + TRange& operator=(TRange&& other) { + if (&other == this) { + return *this; + } + Release(); + Container = other.Container; + FromIndex = other.FromIndex; + ToIndex = other.ToIndex; + other.Reset(); + return *this; + } + + bool IsValid() const { + return static_cast<bool>(Container); + } + + size_t From() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return FromIndex; + } + + size_t To() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return ToIndex; + } + + size_t Size() const { + MKQL_ENSURE(IsValid(), "Internal logic error"); + return ToIndex - FromIndex + 1; + } + + void Extend() { + MKQL_ENSURE(IsValid(), "Internal logic error"); + ++ToIndex; + LockRange(ToIndex, ToIndex); + } + + void Release() { + UnlockRange(FromIndex, ToIndex); + Container.Reset(); + FromIndex = -1; + ToIndex = -1; + } + + private: + TRange(TContainerPtr container, size_t index) + : Container(container) + , FromIndex(index) + , ToIndex(index) + {} + + void LockRange(size_t from, size_t to) { + if (Container) { + Container->LockRange(from, to); + } + } + + void UnlockRange(size_t from, size_t to) { + if (Container) { + Container->UnlockRange(from, to); + } + } + + void Reset() { + Container.Reset(); + FromIndex = -1; + ToIndex = -1; + } + + TContainerPtr Container; + size_t FromIndex; + size_t ToIndex; + }; + +public: + TRange Append(NUdf::TUnboxedValue&& value) { + const auto index = ListSize++; + Container->Add(index, std::move(value)); + return TRange(Container, index); + } + + NUdf::TUnboxedValue Get(size_t i) const { + return Container->Get(i); + } + + ///Return total size of sparse list including absent values + size_t Size() const { + return ListSize; + } + + ///Return number of present values in sparse list + size_t Filled() const { + return Container->Size(); + } + + bool Empty() const { + return Size() == 0; + } + +private: + TContainerPtr Container = MakeIntrusive<TContainer>(); + size_t ListSize = 0; //impl: max index ever stored + 1 +}; + +template<typename L> +class TListValue: public TComputationValue<TListValue<L>> { +public: + TListValue(TMemoryUsageInfo* memUsage, const L& list) + : TComputationValue<TListValue<L>>(memUsage) + , List(list) + { + } + + //TODO https://st.yandex-team.ru/YQL-16508 + //NUdf::TUnboxedValue GetListIterator() const override; + + bool HasFastListLength() const override { + return !List.Empty(); + } + + ui64 GetListLength() const override { + return List.Size(); + } + + bool HasListItems() const override { + return !List.Empty(); + } + + NUdf::IBoxedValuePtr ToIndexDictImpl(const NUdf::IValueBuilder& builder) const override { + Y_UNUSED(builder); + return const_cast<TListValue*>(this); + } + + NUdf::TUnboxedValue Lookup(const NUdf::TUnboxedValuePod& key) const override { + return List.Get(key.Get<ui64>()); + } + +private: + L List; +}; + +}//namespace NKikimr::NMiniKQL::NMatchRecognize + diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_matched_vars.h b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_matched_vars.h index 8c62e0eb7e..31d7cf5cd7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_matched_vars.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_match_recognize_matched_vars.h @@ -1,62 +1,38 @@ #pragma once +#include "mkql_match_recognize_list.h" #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> -namespace NKikimr::NMiniKQL::NMatchRecognize { - -///Range that includes starting and ending points -///Can not be empty -class TMatchedRange { -public: - TMatchedRange(ui64 index) - : FromIndex(index) - , ToIndex(index) - {} - - TMatchedRange(ui64 from, ui64 to) - : FromIndex(from) - , ToIndex(to) - {} - - size_t From() const { - return FromIndex; - } - - size_t To() const { - return ToIndex; - } - void Extend() { - ++ToIndex; - } - -private: - ui64 FromIndex; - ui64 ToIndex; -}; +namespace NKikimr::NMiniKQL::NMatchRecognize { -using TMatchedVar = std::vector<TMatchedRange>; -inline void Extend(TMatchedVar& var, size_t index) { +template<class R> +using TMatchedVar = std::vector<R>; +template<class R> +void Extend(TMatchedVar<R>& var, const R& r) { if (var.empty()) { - var.emplace_back(index); + var.emplace_back(r); } else { - MKQL_ENSURE(index > var.back().To(), "Internal logic error"); - if (var.back().To() + 1 == index) { + MKQL_ENSURE(r.From() > var.back().To(), "Internal logic error"); + if (var.back().To() + 1 == r.From()) { var.back().Extend(); } else { - var.emplace_back(index); + var.emplace_back(r); } } } -using TMatchedVars = std::vector<TMatchedVar>; +template<class R> +using TMatchedVars = std::vector<TMatchedVar<R>>; -inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedRange& range) { +template<class R> +NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const R& range) { std::array<NUdf::TUnboxedValue, 2> array = {NUdf::TUnboxedValuePod{range.From()}, NUdf::TUnboxedValuePod{range.To()}}; return holderFactory.RangeAsArray(cbegin(array), cend(array)); } -inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVar& var) { +template<class R> +NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVar<R>& var) { TUnboxedValueVector data; data.reserve(var.size()); for (const auto& r: var) { @@ -65,7 +41,8 @@ inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TM return holderFactory.VectorAsVectorHolder(std::move(data)); } -inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVars& vars) { +template<class R> +inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TMatchedVars<R>& vars) { NUdf::TUnboxedValue* ptr; auto result = holderFactory.CreateDirectArrayHolder(vars.size(), ptr); for (const auto& v: vars) { @@ -76,35 +53,14 @@ inline NUdf::TUnboxedValue ToValue(const THolderFactory& holderFactory, const TM ///Optimized reference based implementation to be used as an argument ///for lambdas which produce strict result(do not require lazy access to its arguments) -class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> { - class TRangeValue: public TComputationValue<TRangeValue> { - public: - TRangeValue(TMemoryUsageInfo* memInfo, const TMatchedRange& r) - : TComputationValue<TRangeValue>(memInfo) - , Range(r) - { - } - - NUdf::TUnboxedValue* GetElements() const override { - return nullptr; - } - NUdf::TUnboxedValue GetElement(ui32 index) const override { - MKQL_ENSURE(index < 2, "Index out of range"); - switch(index) { - case 0: return NUdf::TUnboxedValuePod(Range.From()); - case 1: return NUdf::TUnboxedValuePod(Range.To()); - } - return NUdf::TUnboxedValuePod(); - } - private: - const TMatchedRange& Range; - }; - +template<class R> +class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue<R>> { class TRangeList: public TComputationValue<TRangeList> { class TIterator : public TComputationValue<TIterator> { public: - TIterator(TMemoryUsageInfo *memInfo, const std::vector<TMatchedRange>& ranges) + TIterator(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const std::vector<R>& ranges) : TComputationValue<TIterator>(memInfo) + , HolderFactory(holderFactory) , Ranges(ranges) , Index(0) {} @@ -114,17 +70,18 @@ class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> { if (Ranges.size() == Index){ return false; } - value = NUdf::TUnboxedValuePod(new TRangeValue(GetMemInfo(), Ranges[Index++])); + value = ToValue(HolderFactory, Ranges[Index++]); return true; } - - const std::vector<TMatchedRange>& Ranges; + const THolderFactory& HolderFactory; + const std::vector<R>& Ranges; size_t Index; }; public: - TRangeList(TMemoryUsageInfo* memInfo, const TMatchedVar& v) + TRangeList(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const TMatchedVar<R>& v) : TComputationValue<TRangeList>(memInfo) + , HolderFactory(holderFactory) , Var(v) { } @@ -142,23 +99,26 @@ class TMatchedVarsValue : public TComputationValue<TMatchedVarsValue> { } NUdf::TUnboxedValue GetListIterator() const override { - return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), Var)); + return HolderFactory.Create<TIterator>(HolderFactory, Var); } private: - const TMatchedVar& Var; + const THolderFactory& HolderFactory; + const TMatchedVar<R>& Var; }; public: - TMatchedVarsValue(TMemoryUsageInfo* memInfo, const std::vector<TMatchedVar>& vars) - : TComputationValue<TMatchedVarsValue>(memInfo) - , Vars(vars) + TMatchedVarsValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, const std::vector<TMatchedVar<R>>& vars) + : TComputationValue<TMatchedVarsValue>(memInfo) + , HolderFactory(holderFactory) + , Vars(vars) { } NUdf::TUnboxedValue GetElement(ui32 index) const override { - return NUdf::TUnboxedValuePod(new TRangeList(GetMemInfo(), Vars[index])); + return HolderFactory.Create<TRangeList>(HolderFactory, Vars[index]); } private: - const std::vector<TMatchedVar>& Vars; + const THolderFactory& HolderFactory; + const std::vector<TMatchedVar<R>>& Vars; }; }//namespace NKikimr::NMiniKQL::NMatchRecognize diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt index 93bf9c2bbb..c1a21d9c99 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.darwin-x86_64.txt @@ -61,6 +61,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt index b40a8f83e2..81ac3b2d11 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-aarch64.txt @@ -64,6 +64,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt index 0eee5d32ff..f6d0d474c6 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.linux-x86_64.txt @@ -65,6 +65,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt index 144f2f042a..48a437b923 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/minikql/comp_nodes/ut/CMakeLists.windows-x86_64.txt @@ -54,6 +54,7 @@ target_sources(ydb-library-yql-minikql-comp_nodes-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_grace_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_map_join_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_safe_circular_buffer_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_sort_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/comp_nodes/ut/mkql_switch_ut.cpp diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp new file mode 100644 index 0000000000..b276c19f69 --- /dev/null +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_list_ut.cpp @@ -0,0 +1,136 @@ +#include "../mkql_match_recognize_list.h" +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/computation/mkql_value_builder.h> +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NMiniKQL::NMatchRecognize { + +template<class L> +void CommonForSimpleAndSparse(const THolderFactory& holderFactory) { + using TList = L; + using TRange = typename L::TRange; + TList list; + TRange r; + for (ui64 i = 0; i != 10; ++i) { + r = list.Append(NUdf::TUnboxedValuePod{i}); + UNIT_ASSERT_VALUES_EQUAL(1, r.Size()); + NUdf::TUnboxedValue v = list.Get(i); + UNIT_ASSERT_VALUES_EQUAL(i, v.Get<ui64>()); + } + UNIT_ASSERT_VALUES_EQUAL(10, list.Size()); + { + auto r2 = list.Append(NUdf::TUnboxedValuePod{10}); + Y_UNUSED(r2); + r.Extend(); + } + UNIT_ASSERT_VALUES_EQUAL(11, list.Size()); + { + const NUdf::TUnboxedValue& v = list.Get(10); + UNIT_ASSERT_VALUES_EQUAL(10, v.Get<ui64>()); + } + //Test access via value + const NUdf::TUnboxedValue& listValue = holderFactory.Create<TListValue<L>>(list); + UNIT_ASSERT(listValue); + UNIT_ASSERT(listValue.HasValue()); + UNIT_ASSERT(listValue.HasListItems()); + UNIT_ASSERT(listValue.HasFastListLength()); + UNIT_ASSERT_VALUES_EQUAL(11, listValue.GetListLength()); + TDefaultValueBuilder valueBuilder(holderFactory); + auto listValueAsDict = NUdf::TBoxedValueAccessor::ToIndexDictImpl(*listValue.AsBoxed(), TDefaultValueBuilder(holderFactory)); + { + const NUdf::TUnboxedValue &v = NUdf::TBoxedValueAccessor::Lookup(*listValueAsDict, NUdf::TUnboxedValuePod{9}); + UNIT_ASSERT_VALUES_EQUAL(9, v.Get<ui64>()); + } + { + const NUdf::TUnboxedValue &v = NUdf::TBoxedValueAccessor::Lookup(*listValueAsDict, NUdf::TUnboxedValuePod{10}); + UNIT_ASSERT_VALUES_EQUAL(10, v.Get<ui64>()); + } +} + +Y_UNIT_TEST_SUITE(MatchRecognizeList) { + TMemoryUsageInfo memUsage("MatchRecognizeListTest"); + Y_UNIT_TEST(SimpleListCommon) { + TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); + CommonForSimpleAndSparse<TSimpleList>(holderFactory); + } + Y_UNIT_TEST(SparseListCommon) { + TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); + CommonForSimpleAndSparse<TSparseList>(holderFactory); + } + Y_UNIT_TEST(SimpleListSpecific) { + TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); + TSimpleList list; + for (ui64 i = 0; i != 10; ++i) { + list.Append(NUdf::TUnboxedValuePod{i}); + } + //All added items are accessible regardless of held ranges(locks) + for (ui64 i = 0; i != 10; ++i) { + NUdf::TUnboxedValue v = list.Get(i); + UNIT_ASSERT_VALUES_EQUAL(i, v.Get<ui64>()); + } + } + Y_UNIT_TEST(SparseListSpecific) { + TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); + TSparseList list; + //Add 10 items + for (ui64 i = 0; i != 10; ++i) { + list.Append(NUdf::TUnboxedValuePod{i}); + } + //Check no one is stored + UNIT_ASSERT_VALUES_EQUAL(0, list.Filled()); + for (ui64 i = 0; i != 10; ++i) { + NUdf::TUnboxedValue v = list.Get(i); + UNIT_ASSERT(!v); + } + //Add another 10 items and lock the last item added at every iteration + TSparseList::TRange r; + for (ui64 i = 10; i != 20; ++i) { + r = list.Append(NUdf::TUnboxedValuePod{i}); + } + //Check that only the last is stored + UNIT_ASSERT_VALUES_EQUAL(1, list.Filled()); + for (ui64 i = 0; i != 19; ++i) { + NUdf::TUnboxedValue v = list.Get(i); + UNIT_ASSERT(!v); + } + { + NUdf::TUnboxedValue v = list.Get(19); + UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>()); + } + + //Test copy and assignment for locks + TSparseList::TRange copiedRange{r}; + TSparseList::TRange assignedRange{r}; + assignedRange = copiedRange; + UNIT_ASSERT_VALUES_EQUAL(1, list.Filled()); + { + NUdf::TUnboxedValue v = list.Get(19); + UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>()); + } + r.Release(); + UNIT_ASSERT_VALUES_EQUAL(1, list.Filled()); + { + NUdf::TUnboxedValue v = list.Get(19); + UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>()); + } + UNIT_ASSERT_VALUES_EQUAL(1, list.Filled()); + copiedRange.Release(); + UNIT_ASSERT_VALUES_EQUAL(1, list.Filled()); + { + NUdf::TUnboxedValue v = list.Get(19); + UNIT_ASSERT_VALUES_EQUAL(19, v.Get<ui64>()); + } + assignedRange.Release(); + UNIT_ASSERT_VALUES_EQUAL(0, list.Filled()); + { + NUdf::TUnboxedValue v = list.Get(19); + UNIT_ASSERT(!v); + } + } +} + +}//namespace NKikimr::NMiniKQL::TMatchRecognize diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp index acfa9a05e2..92716e5faa 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_match_recognize_matched_vars_ut.cpp @@ -1,60 +1,71 @@ #include "../mkql_match_recognize_matched_vars.h" +#include "../mkql_match_recognize_list.h" #include <library/cpp/testing/unittest/registar.h> namespace NKikimr::NMiniKQL::NMatchRecognize { Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarExtend) { + using TRange = TSimpleList::TRange; + using TMatchedVar = TMatchedVar<TRange>; + using TMatchedVars = TMatchedVars<TRange>; + Y_UNIT_TEST(MatchedRangeSingleton) { - TMatchedRange r{10}; + TRange r{10}; UNIT_ASSERT_VALUES_EQUAL(10, r.From()); UNIT_ASSERT_VALUES_EQUAL(10, r.To()); r.Extend(); UNIT_ASSERT_VALUES_EQUAL(10, r.From()); UNIT_ASSERT_VALUES_EQUAL(11, r.To()); } + Y_UNIT_TEST(MatchedRange) { - TMatchedRange r{10, 20}; + TRange r{10, 20}; UNIT_ASSERT_VALUES_EQUAL(10, r.From()); UNIT_ASSERT_VALUES_EQUAL(20, r.To()); r.Extend(); UNIT_ASSERT_VALUES_EQUAL(10, r.From()); UNIT_ASSERT_VALUES_EQUAL(21, r.To()); } + Y_UNIT_TEST(MatchedVarEmpty) { TMatchedVar v{}; - Extend(v, 10); + Extend(v, TRange{10}); UNIT_ASSERT_VALUES_EQUAL(1, v.size()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].From()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].To()); } + Y_UNIT_TEST(MatchedVarExtendSingletonContiguous) { - TMatchedVar v{TMatchedRange{10}}; - Extend(v, 11); + TMatchedVar v{TRange{10}}; + Extend(v, TRange{11}); UNIT_ASSERT_VALUES_EQUAL(1, v.size()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].From()); UNIT_ASSERT_VALUES_EQUAL(11, v[0].To()); } + Y_UNIT_TEST(MatchedVarExtendSingletonWithGap) { - TMatchedVar v{TMatchedRange{10}}; - Extend(v, 20); + TMatchedVar v{TRange{10}}; + Extend(v, TRange{20}); UNIT_ASSERT_VALUES_EQUAL(2, v.size()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].From()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].To()); UNIT_ASSERT_VALUES_EQUAL(20, v[1].From()); UNIT_ASSERT_VALUES_EQUAL(20, v[1].To()); } + Y_UNIT_TEST(MatchedVarExtendContiguous) { - TMatchedVar v{TMatchedRange{10, 20}, TMatchedRange{30, 40}}; - Extend(v, 41); + TMatchedVar v{TRange{10, 20}, TRange{30, 40}}; + Extend(v, TRange{41}); UNIT_ASSERT_VALUES_EQUAL(2, v.size()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].From()); UNIT_ASSERT_VALUES_EQUAL(20, v[0].To()); UNIT_ASSERT_VALUES_EQUAL(30, v[1].From()); UNIT_ASSERT_VALUES_EQUAL(41, v[1].To()); } + Y_UNIT_TEST(MatchedVarExtendWithGap) { - TMatchedVar v{TMatchedRange{10, 20}, TMatchedRange{30, 40}}; - Extend(v, 50); + TMatchedVar v{TRange{10, 20}, TRange{30, 40}}; + Extend(v, TRange{50}); UNIT_ASSERT_VALUES_EQUAL(3, v.size()); UNIT_ASSERT_VALUES_EQUAL(10, v[0].From()); UNIT_ASSERT_VALUES_EQUAL(20, v[0].To()); @@ -66,12 +77,16 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarExtend) { } Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) { + using TRange = TSimpleList::TRange; + using TMatchedVar = TMatchedVar<TRange>; + using TMatchedVars = TMatchedVars<TRange>; TMemoryUsageInfo memUsage("MatchedVars"); + Y_UNIT_TEST(MatchedRange) { TScopedAlloc alloc(__LOCATION__); THolderFactory holderFactory(alloc.Ref(), memUsage); { - TMatchedRange r{10, 20}; + TRange r{10, 20}; const auto value = ToValue(holderFactory, r); const auto elems = value.GetElements(); UNIT_ASSERT(elems); @@ -84,7 +99,7 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) { TScopedAlloc alloc(__LOCATION__); THolderFactory holderFactory(alloc.Ref(), memUsage); { - const auto value = ToValue(holderFactory, std::vector<TMatchedRange>{}); + const auto value = ToValue(holderFactory, TMatchedVar{}); UNIT_ASSERT(value); UNIT_ASSERT(!value.HasListItems()); UNIT_ASSERT(value.HasFastListLength()); @@ -101,9 +116,9 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) { TScopedAlloc alloc(__LOCATION__); THolderFactory holderFactory(alloc.Ref(), memUsage); { - const auto value = ToValue(holderFactory, { - TMatchedRange{10, 30}, - TMatchedRange{40, 45}, + const auto value = ToValue(holderFactory, TMatchedVar{ + TRange{10, 30}, + TRange{40, 45}, }); UNIT_ASSERT(value); @@ -132,10 +147,10 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) { TScopedAlloc alloc(__LOCATION__); THolderFactory holderFactory(alloc.Ref(), memUsage); { - const auto value = ToValue(holderFactory, { + const auto value = ToValue(holderFactory, TMatchedVars { {}, - {TMatchedRange{20, 25}}, - {TMatchedRange{10, 30}, TMatchedRange{40, 45}}, + {TRange{20, 25}}, + {TRange{10, 30}, TRange{40, 45}}, }); UNIT_ASSERT(value); const auto varElems = value.GetElements(); @@ -166,22 +181,30 @@ Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValue) { } Y_UNIT_TEST_SUITE(MatchRecognizeMatchedVarsToValueByRef) { + using TRange = TSimpleList::TRange; + using TMatchedVar = TMatchedVar<TRange>; + using TMatchedVars = TMatchedVars<TRange>; TMemoryUsageInfo memUsage("MatchedVarsByRef"); + Y_UNIT_TEST(MatchedVarsEmpty) { TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); { TMatchedVars vars{}; - NUdf::TUnboxedValue value(NUdf::TUnboxedValuePod(new TMatchedVarsValue(&memUsage, vars))); + NUdf::TUnboxedValue value = holderFactory.Create<TMatchedVarsValue<TRange>>(holderFactory, vars); UNIT_ASSERT(value.HasValue()); } } + Y_UNIT_TEST(MatchedVars) { TScopedAlloc alloc(__LOCATION__); + THolderFactory holderFactory(alloc.Ref(), memUsage); { TMatchedVar A{{1, 4}, {7, 9}, {100, 200}}; TMatchedVar B{{1, 6}}; TMatchedVars vars{A, B}; - NUdf::TUnboxedValue value(NUdf::TUnboxedValuePod(new TMatchedVarsValue(&memUsage, vars))); + NUdf::TUnboxedValue value = holderFactory.Create<TMatchedVarsValue<TRange>>(holderFactory, vars); + Y_UNUSED(value); UNIT_ASSERT(value.HasValue()); auto a = value.GetElement(0); UNIT_ASSERT(a.HasValue()); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/ya.make b/ydb/library/yql/minikql/comp_nodes/ut/ya.make index 4cdc8bf9b5..7d015819ac 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/ya.make +++ b/ydb/library/yql/minikql/comp_nodes/ut/ya.make @@ -41,6 +41,7 @@ SRCS( mkql_grace_join_ut.cpp mkql_map_join_ut.cpp mkql_match_recognize_matched_vars_ut.cpp + mkql_match_recognize_list_ut.cpp mkql_safe_circular_buffer_ut.cpp mkql_sort_ut.cpp mkql_switch_ut.cpp |