aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_custom_list.cpp
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2024-11-07 04:19:26 +0300
committervvvv <vvvv@yandex-team.com>2024-11-07 04:29:50 +0300
commit2661be00f3bc47590fda9218bf0386d6355c8c88 (patch)
tree3d316c07519191283d31c5f537efc6aabb42a2f0 /yql/essentials/minikql/computation/mkql_custom_list.cpp
parentcf2a23963ac10add28c50cc114fbf48953eca5aa (diff)
downloadydb-2661be00f3bc47590fda9218bf0386d6355c8c88.tar.gz
Moved yql/minikql YQL-19206
init [nodiff:caesar] commit_hash:d1182ef7d430ccf7e4d37ed933c7126d7bd5d6e4
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_custom_list.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_custom_list.cpp135
1 files changed, 135 insertions, 0 deletions
diff --git a/yql/essentials/minikql/computation/mkql_custom_list.cpp b/yql/essentials/minikql/computation/mkql_custom_list.cpp
new file mode 100644
index 0000000000..6f8d68013b
--- /dev/null
+++ b/yql/essentials/minikql/computation/mkql_custom_list.cpp
@@ -0,0 +1,135 @@
+#include "mkql_custom_list.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+TForwardListValue::TForwardListValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream)
+ : TCustomListValue(memInfo)
+ , Stream(std::move(stream))
+{
+ MKQL_ENSURE(Stream, "Empty stream.");
+}
+
+TForwardListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream)
+ : TComputationValue(memInfo), Stream(std::move(stream))
+{}
+
+bool TForwardListValue::TIterator::Next(NUdf::TUnboxedValue& value) {
+ const auto status = Stream.Fetch(value);
+ MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status.");
+ return status == NUdf::EFetchStatus::Ok;
+}
+
+NUdf::TUnboxedValue TForwardListValue::GetListIterator() const {
+ MKQL_ENSURE(Stream, "Second pass for ForwardList");
+ return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(Stream)));
+}
+
+TExtendListValue::TExtendListValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists)
+ : TCustomListValue(memInfo)
+ , Lists(std::move(lists))
+{
+ MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
+ Y_ASSERT(!Lists.empty());
+}
+
+TExtendListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& iters)
+ : TComputationValue(memInfo)
+ , Iters(std::move(iters))
+ , Index(0)
+{
+ MKQL_MEM_TAKE(memInfo, Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue));
+}
+
+TExtendListValue::TIterator::~TIterator()
+{
+ MKQL_MEM_RETURN(GetMemInfo(), Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue));
+}
+
+bool TExtendListValue::TIterator::Next(NUdf::TUnboxedValue& value) {
+ for (; Index < Iters.size(); ++Index) {
+ if (Iters[Index].Next(value)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TExtendListValue::TIterator::Skip() {
+ for (; Index < Iters.size(); ++Index) {
+ if (Iters[Index].Skip()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+NUdf::TUnboxedValue TExtendListValue::GetListIterator() const {
+ TUnboxedValueVector iters;
+ iters.reserve(Lists.size());
+ for (const auto& list : Lists) {
+ iters.emplace_back(list.GetListIterator());
+ }
+
+ return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(iters)));
+}
+
+TExtendListValue::~TExtendListValue() {
+ MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
+}
+
+ui64 TExtendListValue::GetListLength() const {
+ if (!Length) {
+ ui64 length = 0ULL;
+ for (const auto& list : Lists) {
+ ui64 partialLength = list.GetListLength();
+ length += partialLength;
+ }
+
+ Length = length;
+ }
+
+ return *Length;
+}
+
+bool TExtendListValue::HasListItems() const {
+ if (!HasItems) {
+ for (const auto& list : Lists) {
+ if (list.HasListItems()) {
+ HasItems = true;
+ break;
+ }
+ }
+
+ if (!HasItems) {
+ HasItems = false;
+ }
+ }
+
+ return *HasItems;
+}
+
+TExtendStreamValue::TExtendStreamValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists)
+ : TBase(memInfo)
+ , Lists(std::move(lists))
+{
+ MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
+ Y_ASSERT(!Lists.empty());
+}
+
+TExtendStreamValue::~TExtendStreamValue() {
+ MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
+}
+
+NUdf::EFetchStatus TExtendStreamValue::Fetch(NUdf::TUnboxedValue& value) {
+ for (; Index < Lists.size(); ++Index) {
+ const auto status = Lists[Index].Fetch(value);
+ if (status != NUdf::EFetchStatus::Finish) {
+ return status;
+ }
+ }
+ return NUdf::EFetchStatus::Finish;
+}
+
+}
+}