diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:19:26 +0300 |
---|---|---|
committer | vvvv <vvvv@yandex-team.com> | 2024-11-07 04:29:50 +0300 |
commit | 2661be00f3bc47590fda9218bf0386d6355c8c88 (patch) | |
tree | 3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_custom_list.cpp | |
parent | cf2a23963ac10add28c50cc114fbf48953eca5aa (diff) | |
download | ydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz |
Moved yql/minikql YQL-19206
init
[nodiff:caesar]
commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_custom_list.cpp')
-rw-r--r-- | yql/essentials/minikql/computation/mkql_custom_list.cpp | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_custom_list.cpp b/yql/essentials/minikql/computation/mkql_custom_list.cpp new file mode 100644 index 0000000000..6f8d68013b --- /dev/null +++ b/yql/essentials/minikql/computation/mkql_custom_list.cpp @@ -0,0 +1,135 @@ +#include "mkql_custom_list.h" + +namespace NKikimr { +namespace NMiniKQL { + +TForwardListValue::TForwardListValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream) + : TCustomListValue(memInfo) + , Stream(std::move(stream)) +{ + MKQL_ENSURE(Stream, "Empty stream."); +} + +TForwardListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream) + : TComputationValue(memInfo), Stream(std::move(stream)) +{} + +bool TForwardListValue::TIterator::Next(NUdf::TUnboxedValue& value) { + const auto status = Stream.Fetch(value); + MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status."); + return status == NUdf::EFetchStatus::Ok; +} + +NUdf::TUnboxedValue TForwardListValue::GetListIterator() const { + MKQL_ENSURE(Stream, "Second pass for ForwardList"); + return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(Stream))); +} + +TExtendListValue::TExtendListValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists) + : TCustomListValue(memInfo) + , Lists(std::move(lists)) +{ + MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue)); + Y_ASSERT(!Lists.empty()); +} + +TExtendListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& iters) + : TComputationValue(memInfo) + , Iters(std::move(iters)) + , Index(0) +{ + MKQL_MEM_TAKE(memInfo, Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue)); +} + +TExtendListValue::TIterator::~TIterator() +{ + MKQL_MEM_RETURN(GetMemInfo(), Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue)); +} + +bool TExtendListValue::TIterator::Next(NUdf::TUnboxedValue& value) { + for (; Index < Iters.size(); ++Index) { + if (Iters[Index].Next(value)) { + return true; + } + } + return false; +} + +bool TExtendListValue::TIterator::Skip() { + for (; Index < Iters.size(); ++Index) { + if (Iters[Index].Skip()) { + return true; + } + } + return false; +} + +NUdf::TUnboxedValue TExtendListValue::GetListIterator() const { + TUnboxedValueVector iters; + iters.reserve(Lists.size()); + for (const auto& list : Lists) { + iters.emplace_back(list.GetListIterator()); + } + + return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(iters))); +} + +TExtendListValue::~TExtendListValue() { + MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue)); +} + +ui64 TExtendListValue::GetListLength() const { + if (!Length) { + ui64 length = 0ULL; + for (const auto& list : Lists) { + ui64 partialLength = list.GetListLength(); + length += partialLength; + } + + Length = length; + } + + return *Length; +} + +bool TExtendListValue::HasListItems() const { + if (!HasItems) { + for (const auto& list : Lists) { + if (list.HasListItems()) { + HasItems = true; + break; + } + } + + if (!HasItems) { + HasItems = false; + } + } + + return *HasItems; +} + +TExtendStreamValue::TExtendStreamValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists) + : TBase(memInfo) + , Lists(std::move(lists)) +{ + MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue)); + Y_ASSERT(!Lists.empty()); +} + +TExtendStreamValue::~TExtendStreamValue() { + MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue)); +} + +NUdf::EFetchStatus TExtendStreamValue::Fetch(NUdf::TUnboxedValue& value) { + for (; Index < Lists.size(); ++Index) { + const auto status = Lists[Index].Fetch(value); + if (status != NUdf::EFetchStatus::Finish) { + return status; + } + } + return NUdf::EFetchStatus::Finish; +} + +} +} |