aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorAlexander Smirnov <alex@ydb.tech>2025-03-04 16:15:41 +0000
committerAlexander Smirnov <alex@ydb.tech>2025-03-04 16:15:41 +0000
commitb21a377d1f5b24149cf65fd1f8feb44411ae38f9 (patch)
tree0459a651275d60cf60489d8142f20a8bd5e6a199 /yql/essentials
parent827cd39b843ead1adfaa20f8a55e2e17da62a4eb (diff)
parent00325857a11f51ad6b43a4d35f57e85e06866ab6 (diff)
downloadydb-b21a377d1f5b24149cf65fd1f8feb44411ae38f9.tar.gz
Merge pull request #15307 from ydb-platform/merge-libs-250304-1328
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/core/facade/yql_facade.cpp1
-rw-r--r--yql/essentials/linters.make.inc0
-rw-r--r--yql/essentials/minikql/aligned_page_pool.cpp47
-rw-r--r--yql/essentials/minikql/aligned_page_pool.h6
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_apply.cpp17
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_dictitems.cpp19
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_replicate.cpp13
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_chopper_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_combine_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_flatmap_ut.cpp3
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_join_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_map_join_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_multimap_ut.cpp3
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_chopper_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp3
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_condense_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_filter_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_map_ut.cpp4
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_wide_nodes_ut.cpp4
-rw-r--r--yql/essentials/minikql/computation/mkql_block_reader.cpp18
-rw-r--r--yql/essentials/minikql/computation/mkql_block_transport.cpp56
-rw-r--r--yql/essentials/minikql/computation/mkql_block_trimmer.cpp16
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp15
-rw-r--r--yql/essentials/minikql/mkql_alloc.cpp97
-rw-r--r--yql/essentials/minikql/mkql_alloc.h9
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp161
-rw-r--r--yql/essentials/minikql/mkql_type_builder.cpp43
-rw-r--r--yql/essentials/providers/common/mkql/yql_provider_mkql.cpp14
-rw-r--r--yql/essentials/providers/common/mkql/yql_type_mkql.cpp10
-rw-r--r--yql/essentials/providers/common/schema/mkql/yql_mkql_schema.cpp8
-rw-r--r--yql/essentials/public/issue/yql_warning.cpp8
-rw-r--r--yql/essentials/public/issue/yql_warning.h3
-rw-r--r--yql/essentials/public/udf/arrow/block_builder.h53
-rw-r--r--yql/essentials/public/udf/arrow/block_item.h12
-rw-r--r--yql/essentials/public/udf/arrow/block_item_comparator.h18
-rw-r--r--yql/essentials/public/udf/arrow/block_item_hasher.h8
-rw-r--r--yql/essentials/public/udf/arrow/block_reader.h51
-rw-r--r--yql/essentials/public/udf/arrow/dispatch_traits.h18
-rw-r--r--yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp40
-rw-r--r--yql/essentials/public/udf/arrow/util.h15
-rw-r--r--yql/essentials/sql/settings/translation_settings.h1
-rw-r--r--yql/essentials/sql/v1/context.cpp1
-rw-r--r--yql/essentials/tests/sql/minirun/part0/canondata/result.json21
-rw-r--r--yql/essentials/tests/sql/minirun/part2/canondata/result.json21
-rw-r--r--yql/essentials/tests/sql/minirun/part7/canondata/result.json2
-rw-r--r--yql/essentials/tests/sql/minirun/part8/canondata/result.json42
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/result.json50
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_/formatted.sql72
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_optional_/formatted.sql72
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_/formatted.sql33
-rw-r--r--yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_optional_/formatted.sql33
-rw-r--r--yql/essentials/tests/sql/suites/blocks/agg_singular_type_key.sql23
-rw-r--r--yql/essentials/tests/sql/suites/blocks/agg_singular_type_key_optional.sql24
-rw-r--r--yql/essentials/tests/sql/suites/blocks/agg_singular_type_value.sql26
-rw-r--r--yql/essentials/tests/sql/suites/blocks/agg_singular_type_value_optional.sql27
-rw-r--r--yql/essentials/types/binary_json/ut/entry_ut.cpp23
-rw-r--r--yql/essentials/types/binary_json/write.cpp39
-rw-r--r--yql/essentials/types/binary_json/write.h11
-rw-r--r--yql/essentials/udfs/language/yql/yql_language_udf.cpp38
60 files changed, 1099 insertions, 282 deletions
diff --git a/yql/essentials/core/facade/yql_facade.cpp b/yql/essentials/core/facade/yql_facade.cpp
index f8d9c28ba1..547c3a292e 100644
--- a/yql/essentials/core/facade/yql_facade.cpp
+++ b/yql/essentials/core/facade/yql_facade.cpp
@@ -691,6 +691,7 @@ void TProgram::HandleTranslationSettings(NSQLTranslation::TTranslationSettings&
loadedSettings.V0WarnAsError = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["V0WarnAsError"].AsBool());
loadedSettings.DqDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["DqDefaultAuto"].AsBool());
loadedSettings.BlockDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["BlockDefaultAuto"].AsBool());
+ loadedSettings.IsReplay = true;
currentSettings = &loadedSettings;
}
}
diff --git a/yql/essentials/linters.make.inc b/yql/essentials/linters.make.inc
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/yql/essentials/linters.make.inc
diff --git a/yql/essentials/minikql/aligned_page_pool.cpp b/yql/essentials/minikql/aligned_page_pool.cpp
index 04aab1bd8a..bcbb3edd11 100644
--- a/yql/essentials/minikql/aligned_page_pool.cpp
+++ b/yql/essentials/minikql/aligned_page_pool.cpp
@@ -783,6 +783,40 @@ void* GetAlignedPage(ui64 size) {
}
template<typename TMmap>
+void* GetAlignedPage() {
+ const auto size = TAlignedPagePool::POOL_PAGE_SIZE;
+ auto& globalPool = TGlobalPools<TMmap, false>::Instance();
+
+ if (auto* page = globalPool.Get(0).GetPage()) {
+ return page;
+ }
+
+ auto allocSize = size * 2;
+ void* unalignedPtr = globalPool.DoMmap(allocSize);
+ if (Y_UNLIKELY(MAP_FAILED == unalignedPtr)) {
+ TStringStream mmaps;
+ const auto lastError = LastSystemError();
+ if (lastError == ENOMEM) {
+ mmaps << GetMemoryMapsString();
+ }
+
+ ythrow yexception() << "Mmap failed to allocate " << allocSize << " bytes: "
+ << LastSystemErrorText(lastError) << mmaps.Str();
+ }
+
+ void* page = AlignUp(unalignedPtr, size);
+
+ // Unmap unaligned prefix before offset and tail after aligned page
+ const size_t offset = (intptr_t)page - (intptr_t)unalignedPtr;
+ if (Y_UNLIKELY(offset)) {
+ globalPool.DoMunmap(unalignedPtr, offset);
+ globalPool.DoMunmap((ui8*)page + size, size - offset);
+ }
+
+ return page;
+}
+
+template<typename TMmap>
void ReleaseAlignedPage(void* mem, ui64 size) {
size = AlignUp(size, SYS_PAGE_SIZE);
if (size < TAlignedPagePool::POOL_PAGE_SIZE) {
@@ -801,6 +835,11 @@ void ReleaseAlignedPage(void* mem, ui64 size) {
}
template<typename TMmap>
+void ReleaseAlignedPage(void* ptr) {
+ TGlobalPools<TMmap, false>::Instance().PushPage(0, ptr);
+}
+
+template<typename TMmap>
i64 GetTotalMmapedBytes() {
return TGlobalPools<TMmap, true>::Instance().GetTotalMmappedBytes() + TGlobalPools<TMmap, false>::Instance().GetTotalMmappedBytes();
}
@@ -822,10 +861,18 @@ template void* GetAlignedPage<>(ui64);
template void* GetAlignedPage<TFakeAlignedMmap>(ui64);
template void* GetAlignedPage<TFakeUnalignedMmap>(ui64);
+template void* GetAlignedPage<>();
+template void* GetAlignedPage<TFakeAlignedMmap>();
+template void* GetAlignedPage<TFakeUnalignedMmap>();
+
template void ReleaseAlignedPage<>(void*,ui64);
template void ReleaseAlignedPage<TFakeAlignedMmap>(void*,ui64);
template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*,ui64);
+template void ReleaseAlignedPage<>(void*);
+template void ReleaseAlignedPage<TFakeAlignedMmap>(void*);
+template void ReleaseAlignedPage<TFakeUnalignedMmap>(void*);
+
size_t GetMemoryMapsCount() {
size_t lineCount = 0;
TString line;
diff --git a/yql/essentials/minikql/aligned_page_pool.h b/yql/essentials/minikql/aligned_page_pool.h
index 4a5b1d2e55..511b99b4d7 100644
--- a/yql/essentials/minikql/aligned_page_pool.h
+++ b/yql/essentials/minikql/aligned_page_pool.h
@@ -309,9 +309,15 @@ template<typename TMmap = TSystemMmap>
void* GetAlignedPage(ui64 size);
template<typename TMmap = TSystemMmap>
+void* GetAlignedPage();
+
+template<typename TMmap = TSystemMmap>
void ReleaseAlignedPage(void* mem, ui64 size);
template<typename TMmap = TSystemMmap>
+void ReleaseAlignedPage(void* mem);
+
+template<typename TMmap = TSystemMmap>
i64 GetTotalMmapedBytes();
template<typename TMmap = TSystemMmap>
i64 GetTotalFreeListBytes();
diff --git a/yql/essentials/minikql/comp_nodes/mkql_apply.cpp b/yql/essentials/minikql/comp_nodes/mkql_apply.cpp
index 180a38176b..60fbe82ad4 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_apply.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_apply.cpp
@@ -206,9 +206,8 @@ private:
}
IComputationNode* WrapApply(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- const bool withPos = callable.GetType()->GetName() == "Apply2";
- const ui32 deltaArgs = withPos ? 3 : 0;
- MKQL_ENSURE(callable.GetInputsCount() >= 2 + deltaArgs, "Expected at least " << (2 + deltaArgs) << " arguments");
+ MKQL_ENSURE(callable.GetInputsCount() >= 5, "Expected at least 5 arguments");
+ constexpr size_t posArgs = 3;
const auto function = callable.GetInput(0);
MKQL_ENSURE(!function.IsImmediate() && function.GetNode()->GetType()->IsCallable(),
@@ -218,11 +217,11 @@ IComputationNode* WrapApply(TCallable& callable, const TComputationNodeFactoryCo
const auto returnType = functionCallable->GetType()->GetReturnType();
MKQL_ENSURE(returnType->IsCallable(), "Expected callable as return type");
- const TStringBuf file = withPos ? AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef() : NUdf::TStringRef();
- const ui32 row = withPos ? AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>() : 0;
- const ui32 column = withPos ? AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<ui32>() : 0;
+ const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef();
+ const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
+ const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<ui32>();
- const ui32 inputsCount = callable.GetInputsCount() - deltaArgs;
+ const ui32 inputsCount = callable.GetInputsCount() - posArgs;
const ui32 argsCount = inputsCount - 2;
const ui32 dependentCount = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
@@ -235,11 +234,11 @@ IComputationNode* WrapApply(TCallable& callable, const TComputationNodeFactoryCo
TComputationNodePtrVector argNodes(callableType->GetArgumentsCount() + dependentCount);
for (ui32 i = 2; i < 2 + usedArgs; ++i) {
- argNodes[i - 2] = LocateNode(ctx.NodeLocator, callable, i + deltaArgs);
+ argNodes[i - 2] = LocateNode(ctx.NodeLocator, callable, i + posArgs);
}
for (ui32 i = 2 + usedArgs; i < inputsCount; ++i) {
- argNodes[callableType->GetArgumentsCount() + i - 2 - usedArgs] = LocateNode(ctx.NodeLocator, callable, i + deltaArgs);
+ argNodes[callableType->GetArgumentsCount() + i - 2 - usedArgs] = LocateNode(ctx.NodeLocator, callable, i + posArgs);
}
auto functionNode = LocateNode(ctx.NodeLocator, callable, 0);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_dictitems.cpp b/yql/essentials/minikql/comp_nodes/mkql_dictitems.cpp
index 8790089972..295bee402b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_dictitems.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_dictitems.cpp
@@ -277,24 +277,9 @@ private:
}
IComputationNode* WrapDictItems(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1 || callable.GetInputsCount() == 2, "Expected one or two args");
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected one arg");
const auto node = LocateNode(ctx.NodeLocator, callable, 0);
-
- if (1U == callable.GetInputsCount()) {
- return new TDictItemsWrapper(ctx.Mutables, node);
- }
-
- const auto mode = AS_VALUE(TDataLiteral, callable.GetInput(1))->AsValue().Get<ui32>();
- switch (static_cast<EDictItems>(mode)) {
- case EDictItems::Both:
- return new TDictItemsWrapper(ctx.Mutables, node);
- case EDictItems::Keys:
- return new TDictHalfsWrapper<true>(ctx.Mutables, node);
- case EDictItems::Payloads:
- return new TDictHalfsWrapper<false>(ctx.Mutables, node);
- default:
- Y_ABORT("Unknown mode: %" PRIu32, mode);
- }
+ return new TDictItemsWrapper(ctx.Mutables, node);
}
IComputationNode* WrapDictKeys(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
index 3b2119ba21..c6b198463b 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
@@ -219,7 +219,6 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"Invoke", &WrapInvoke},
{"Udf", &WrapUdf},
{"ScriptUdf", &WrapScriptUdf},
- {"Apply", &WrapApply},
{"Apply2", &WrapApply},
{"Callable", &WrapCallable},
{"Size", &WrapSize},
diff --git a/yql/essentials/minikql/comp_nodes/mkql_replicate.cpp b/yql/essentials/minikql/comp_nodes/mkql_replicate.cpp
index 81996e07bf..2adac528b9 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_replicate.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_replicate.cpp
@@ -227,20 +227,17 @@ private:
}
IComputationNode* WrapReplicate(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 2 || callable.GetInputsCount() == 5, "Expected 2 or 5 args");
+ MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args");
const auto countType = AS_TYPE(TDataType, callable.GetInput(1));
MKQL_ENSURE(countType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected ui64");
const auto list = LocateNode(ctx.NodeLocator, callable, 0);
const auto count = LocateNode(ctx.NodeLocator, callable, 1);
- NUdf::TSourcePosition pos;
- if (callable.GetInputsCount() == 5) {
- const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef();
- const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
- const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<ui32>();
- pos = NUdf::TSourcePosition(row, column, file);
- }
+ const TStringBuf file = AS_VALUE(TDataLiteral, callable.GetInput(2))->AsValue().AsStringRef();
+ const ui32 row = AS_VALUE(TDataLiteral, callable.GetInput(3))->AsValue().Get<ui32>();
+ const ui32 column = AS_VALUE(TDataLiteral, callable.GetInput(4))->AsValue().Get<ui32>();
+ const NUdf::TSourcePosition pos = NUdf::TSourcePosition(row, column, file);
return new TReplicateWrapper(ctx.Mutables, list, count, pos);
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_chopper_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_chopper_ut.cpp
index 8e314cfb9e..811927b14c 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_chopper_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_chopper_ut.cpp
@@ -297,7 +297,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLChopperStreamTest) {
UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
}
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 9u
+
Y_UNIT_TEST_SUITE(TMiniKQLChopperFlowTest) {
Y_UNIT_TEST_LLVM(TestEmpty) {
TSetup<LLVM> setup;
@@ -482,6 +482,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLChopperFlowTest) {
UNIT_ASSERT_VALUES_EQUAL(TStringBuf(result.AsStringRef()), "|0|0|0|0|0|");
}
}
-#endif
+
} // NMiniKQL
} // NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_combine_ut.cpp
index 55cea5f9fa..ab80f54e00 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_combine_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_combine_ut.cpp
@@ -799,7 +799,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCombineStreamPerfTest) {
Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
}
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 3u
+
Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowTest) {
Y_UNIT_TEST_LLVM(TestFullCombineWithOptOut) {
TSetup<LLVM> setup(GetNodeFactory());
@@ -1530,6 +1530,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLCombineFlowPerfTest) {
Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
}
}
-#endif
+
} // NMiniKQL
} // NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_flatmap_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_flatmap_ut.cpp
index 12c11e4813..e8282e94d7 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_flatmap_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_flatmap_ut.cpp
@@ -522,7 +522,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLFlatMapTest) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_LLVM(TestNarrowWithList) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -846,7 +846,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLFlatMapTest) {
UNIT_ASSERT_VALUES_EQUAL(NUdf::EFetchStatus::Finish, iterator.Fetch(item));
UNIT_ASSERT_VALUES_EQUAL(NUdf::EFetchStatus::Finish, iterator.Fetch(item));
}
-#endif
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_join_ut.cpp
index d404644972..3731ff1539 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_join_ut.cpp
@@ -116,7 +116,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreTupleTest) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
Y_UNIT_TEST_LLVM(Inner) {
TSetup<LLVM> setup;
@@ -323,7 +323,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLCommonJoinCoreWideTest) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
index e68d4bd46e..fe734804ea 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_map_join_ut.cpp
@@ -585,7 +585,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMapJoinCoreTest) {
}
}
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
Y_UNIT_TEST_LLVM(TestInner) {
for (ui32 pass = 0; pass < 1; ++pass) {
@@ -1144,7 +1144,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapJoinCoreTest) {
}
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_multimap_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_multimap_ut.cpp
index 5dcb30db2e..d6735151f6 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_multimap_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_multimap_ut.cpp
@@ -121,7 +121,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiMapTest) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_LLVM(TestFlattenByNarrow) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -164,7 +164,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLMultiMapTest) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
-#endif
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_chopper_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_chopper_ut.cpp
index 554dd8a3d3..95ce34a54c 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_chopper_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_chopper_ut.cpp
@@ -3,7 +3,7 @@
namespace NKikimr {
namespace NMiniKQL {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideChopperTest) {
Y_UNIT_TEST_LLVM(TestConcatKeyToItems) {
TSetup<LLVM> setup;
@@ -468,7 +468,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideChopperTest) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
index 55cb6babbf..4e4d13163c 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_combine_ut.cpp
@@ -155,7 +155,6 @@ void CheckIfStreamHasExpectedStringValues(const NUdf::TUnboxedValue& streamValue
} // unnamed
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerTest) {
Y_UNIT_TEST_LLVM(TestLongStringsRefCounting) {
TSetup<LLVM> setup;
@@ -1065,7 +1064,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCombinerPerfTest) {
Cerr << "Runtime is " << t2 - t1 << " vs C++ " << cppTime << Endl;
}
}
-#endif
+
#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 29u
Y_UNIT_TEST_SUITE(TMiniKQLWideLastCombinerTest) {
Y_UNIT_TEST_LLVM_SPILLING(TestLongStringsRefCounting) {
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_condense_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_condense_ut.cpp
index b231df462b..1313812732 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_condense_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_condense_ut.cpp
@@ -3,7 +3,7 @@
namespace NKikimr {
namespace NMiniKQL {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideCondense1Test) {
Y_UNIT_TEST_LLVM(TestConcatItemsToKey) {
TSetup<LLVM> setup;
@@ -167,7 +167,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideCondense1Test) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_filter_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_filter_ut.cpp
index 15c8516089..fd7b3f2f23 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_filter_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_filter_ut.cpp
@@ -3,7 +3,7 @@
namespace NKikimr {
namespace NMiniKQL {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideFilterTest) {
Y_UNIT_TEST_LLVM(TestPredicateExpression) {
TSetup<LLVM> setup;
@@ -381,6 +381,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideFilterTest) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_map_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_map_ut.cpp
index d1afec27f4..97c7d82d7b 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_map_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_map_ut.cpp
@@ -3,7 +3,7 @@
namespace NKikimr {
namespace NMiniKQL {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideMapTest) {
Y_UNIT_TEST_LLVM(TestSimpleSwap) {
TSetup<LLVM> setup;
@@ -330,6 +330,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideMapTest) {
UNIT_ASSERT(!iterator.Next(item));
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_nodes_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_nodes_ut.cpp
index 093810f0bc..1cbe7d774b 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_wide_nodes_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_wide_nodes_ut.cpp
@@ -3,7 +3,7 @@
namespace NKikimr {
namespace NMiniKQL {
-#if !defined(MKQL_RUNTIME_VERSION) || MKQL_RUNTIME_VERSION >= 18u
+
Y_UNIT_TEST_SUITE(TMiniKQLWideNodesTest) {
// TDOD: fixme
#if 0
@@ -119,6 +119,6 @@ Y_UNIT_TEST_SUITE(TMiniKQLWideNodesTest) {
}
}
-#endif
+
}
}
diff --git a/yql/essentials/minikql/computation/mkql_block_reader.cpp b/yql/essentials/minikql/computation/mkql_block_reader.cpp
index 4e2060e739..5886e121c4 100644
--- a/yql/essentials/minikql/computation/mkql_block_reader.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_reader.cpp
@@ -162,6 +162,19 @@ private:
i32 TypeLen = 0;
};
+class TSingularTypeItemConverter: public IBlockItemConverter {
+public:
+ NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final {
+ Y_UNUSED(item, holderFactory);
+ return NUdf::TUnboxedValuePod::Zero();
+ }
+
+ TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final {
+ Y_UNUSED(value);
+ return TBlockItem::Zero();
+ }
+};
+
template <bool Nullable>
class TTupleBlockItemConverter : public IBlockItemConverter {
public:
@@ -285,6 +298,7 @@ struct TConverterTraits {
using TExtOptional = TExternalOptionalBlockItemConverter;
template<typename TTzDate, bool Nullable>
using TTzDateConverter = TTzDateBlockItemConverter<TTzDate, Nullable>;
+ using TSingularType = TSingularTypeItemConverter;
constexpr static bool PassType = false;
@@ -325,6 +339,10 @@ struct TConverterTraits {
return std::make_unique<TTzDateConverter<TTzDate, false>>();
}
}
+
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
};
} // namespace
diff --git a/yql/essentials/minikql/computation/mkql_block_transport.cpp b/yql/essentials/minikql/computation/mkql_block_transport.cpp
index a03a5027e8..766df54889 100644
--- a/yql/essentials/minikql/computation/mkql_block_transport.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_transport.cpp
@@ -13,7 +13,7 @@ namespace {
using NYql::TChunkedBuffer;
TChunkedBuffer MakeChunkedBufferAndUntrack(const std::shared_ptr<const arrow::Buffer>& owner, const char* data, size_t size) {
- MKQLArrowUntrack(owner->data());
+ MKQLArrowUntrack(owner->data(), owner->capacity());
return TChunkedBuffer(TStringBuf{data, size}, owner);
}
@@ -429,6 +429,49 @@ private:
const std::unique_ptr<TBlockDeserializerBase> Inner_;
};
+class TSingularTypeBlockSerializer final: public IBlockSerializer {
+private:
+ size_t ArrayMetadataCount() const final {
+ return 0;
+ }
+
+ void StoreMetadata(const arrow::ArrayData& data, const IBlockSerializer::TMetadataSink& metaSink) const final {
+ Y_UNUSED(data, metaSink);
+ }
+
+ void StoreArray(const arrow::ArrayData& data, TChunkedBuffer& dst) const final {
+ Y_UNUSED(data, dst);
+ }
+};
+
+class TSingularTypeBlockDeserializer final: public TBlockDeserializerBase {
+private:
+ void DoLoadMetadata(const TMetadataSource& metaSource) final {
+ Y_UNUSED(metaSource);
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoMakeDefaultValue(const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) const final {
+ Y_UNUSED(offset);
+ Y_ENSURE(nullsCount == 0);
+ Y_ENSURE(!nulls || nulls->size() == 0);
+ return arrow::NullArray(blockLen).data();
+ }
+
+ std::shared_ptr<arrow::ArrayData> DoLoadArray(TChunkedBuffer& src, const std::shared_ptr<arrow::Buffer>& nulls, i64 nullsCount, ui64 blockLen, ui64 offset) final {
+ Y_UNUSED(offset, src);
+ Y_ENSURE(nullsCount == 0);
+ Y_ENSURE(!nulls || nulls->size() == 0);
+ return arrow::NullArray(blockLen).data();
+ }
+
+ bool IsNullable() const final {
+ return false;
+ }
+
+ void DoResetMetadata() final {
+ }
+};
+
template<bool Nullable, typename TDerived>
class TTupleBlockSerializerBase : public IBlockSerializer {
size_t ArrayMetadataCount() const final {
@@ -632,7 +675,7 @@ struct TSerializerTraits {
using TExtOptional = TExtOptionalBlockSerializer;
template<typename TTzDateType, bool Nullable>
using TTzDate = TTzDateBlockSerializer<TTzDateType, Nullable>;
-
+ using TSingularType = TSingularTypeBlockSerializer;
constexpr static bool PassType = false;
static std::unique_ptr<TResult> MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) {
@@ -648,6 +691,10 @@ struct TSerializerTraits {
ythrow yexception() << "Serializer not implemented for block resources";
}
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
+
template<typename TTzDateType>
static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
if (isOptional) {
@@ -670,6 +717,7 @@ struct TDeserializerTraits {
using TExtOptional = TExtOptionalBlockDeserializer;
template<typename TTzDateType, bool Nullable>
using TTzDate = TTzDateBlockDeserializer<TTzDateType, Nullable>;
+ using TSingularType = TSingularTypeBlockDeserializer;
constexpr static bool PassType = false;
@@ -686,6 +734,10 @@ struct TDeserializerTraits {
ythrow yexception() << "Deserializer not implemented for block resources";
}
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
+
template<typename TTzDateType>
static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
if (isOptional) {
diff --git a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
index b53a3890a4..0b53f91452 100644
--- a/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_trimmer.cpp
@@ -98,6 +98,17 @@ public:
}
};
+class TSingularBlockTrimmer: public TBlockTrimmerBase {
+public:
+ TSingularBlockTrimmer(arrow::MemoryPool* pool)
+ : TBlockTrimmerBase(pool) {
+ }
+
+ std::shared_ptr<arrow::ArrayData> Trim(const std::shared_ptr<arrow::ArrayData>& array) override {
+ return array;
+ }
+};
+
template<typename TStringType, bool Nullable>
class TStringBlockTrimmer : public TBlockTrimmerBase {
using TOffset = typename TStringType::offset_type;
@@ -217,6 +228,7 @@ struct TTrimmerTraits {
using TResource = TResourceBlockTrimmer<Nullable>;
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateBlockTrimmer<TTzDate, Nullable>;
+ using TSingular = TSingularBlockTrimmer;
constexpr static bool PassType = false;
@@ -237,6 +249,10 @@ struct TTrimmerTraits {
}
}
+ static TResult::TPtr MakeSingular(arrow::MemoryPool* pool) {
+ return std::make_unique<TSingular>(pool);
+ }
+
template<typename TTzDate>
static TResult::TPtr MakeTzDate(bool isOptional, arrow::MemoryPool* pool) {
if (isOptional) {
diff --git a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
index b689e4cf8b..cbff1c5722 100644
--- a/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -674,6 +674,8 @@ protected:
auto tzDateType = PgmBuilder.NewDataType(NUdf::EDataSlot::TzDate);
auto blockTzDateType = PgmBuilder.NewBlockType(tzDateType, TBlockType::EShape::Many);
+ auto nullType = PgmBuilder.NewNullType();
+ auto blockNullType = PgmBuilder.NewBlockType(nullType, TBlockType::EShape::Many);
auto rowType =
legacyStruct
@@ -683,11 +685,12 @@ protected:
{"_yql_block_length", scalarUi64Type},
{"a", scalarOptStrType},
{"b", blockOptTupleOptUi32StrType},
- {"c", blockTzDateType}
+ {"c", blockTzDateType},
+ {"nill", blockNullType},
})
: PgmBuilder.NewMultiType(
{blockUi32Type, blockOptStrType, scalarOptStrType,
- blockOptTupleOptUi32StrType, blockTzDateType, scalarUi64Type});
+ blockOptTupleOptUi32StrType, blockTzDateType, blockNullType, scalarUi64Type});
ui64 blockLen = 1000;
UNIT_ASSERT_LE(offset + len, blockLen);
@@ -696,6 +699,8 @@ protected:
auto builder2 = MakeArrayBuilder(TTypeInfoHelper(), optStrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optStrType)), nullptr);
auto builder3 = MakeArrayBuilder(TTypeInfoHelper(), optTupleOptUi32StrType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(optTupleOptUi32StrType)), nullptr);
auto builder4 = MakeArrayBuilder(TTypeInfoHelper(), tzDateType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(tzDateType)), nullptr);
+ auto builder5 = MakeArrayBuilder(TTypeInfoHelper(), nullType, *ArrowPool_, CalcBlockLen(CalcMaxBlockItemSize(nullType)), nullptr);
+
for (ui32 i = 0; i < blockLen; ++i) {
TBlockItem b1(i);
@@ -712,6 +717,7 @@ protected:
TBlockItem tzDate {i};
tzDate.SetTimezoneId(i % 100);
builder4->Add(tzDate);
+ builder5->Add(TBlockItem::Zero());
}
std::string_view testScalarString = "foobar";
@@ -725,12 +731,14 @@ protected:
datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
datums.emplace_back(builder3->Build(true));
datums.emplace_back(builder4->Build(true));
+ datums.emplace_back(builder5->Build(true));
} else {
datums.emplace_back(builder1->Build(true));
datums.emplace_back(builder2->Build(true));
datums.emplace_back(arrow::Datum(std::make_shared<arrow::BinaryScalar>(strbuf)));
datums.emplace_back(builder3->Build(true));
datums.emplace_back(builder4->Build(true));
+ datums.emplace_back(builder5->Build(true));
datums.emplace_back(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(blockLen)));
}
@@ -785,6 +793,7 @@ protected:
auto reader2 = MakeBlockReader(TTypeInfoHelper(), optStrType);
auto reader3 = MakeBlockReader(TTypeInfoHelper(), optTupleOptUi32StrType);
auto reader4 = MakeBlockReader(TTypeInfoHelper(), tzDateType);
+ auto reader5 = MakeBlockReader(TTypeInfoHelper(), nullType);
for (ui32 i = offset; i < len; ++i) {
TBlockItem b1 = reader1->GetItem(*TArrowBlock::From(unpackedColumns[0]).GetDatum().array(), i - offset);
@@ -814,6 +823,8 @@ protected:
TBlockItem b4 = reader4->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 5 : 4]).GetDatum().array(), i - offset);
UNIT_ASSERT(b4.Get<ui16>() == i);
UNIT_ASSERT(b4.GetTimezoneId() == (i % 100));
+ TBlockItem b5 = reader5->GetItem(*TArrowBlock::From(unpackedColumns[legacyStruct ? 6 : 5]).GetDatum().array(), i - offset);
+ UNIT_ASSERT(b5);
}
}
}
diff --git a/yql/essentials/minikql/mkql_alloc.cpp b/yql/essentials/minikql/mkql_alloc.cpp
index 963f46a67e..8446522eda 100644
--- a/yql/essentials/minikql/mkql_alloc.cpp
+++ b/yql/essentials/minikql/mkql_alloc.cpp
@@ -7,6 +7,8 @@ namespace NKikimr {
namespace NMiniKQL {
+constexpr ui64 ArrowSizeForArena = (TAllocState::POOL_PAGE_SIZE >> 2);
+
Y_POD_THREAD(TAllocState*) TlsAllocState;
TAllocPageHeader TAllocState::EmptyPageHeader = { 0, 0, 0, 0, nullptr, nullptr };
@@ -94,6 +96,10 @@ void TAllocState::KillAllBoxed() {
OffloadedBlocksRoot.InitLinks();
}
+ if (CurrentArrowPages) {
+ MKQLArrowFree(CurrentArrowPages, 0);
+ CurrentArrowPages = nullptr;
+ }
CleanupArrowList(&ArrowBlocksRoot);
#ifndef NDEBUG
@@ -253,7 +259,49 @@ void TPagedArena::Clear() noexcept {
}
}
+void* MKQLArrowAllocateOnArena(ui64 size) {
+ TAllocState* state = TlsAllocState;
+ Y_ENSURE(state);
+
+ auto alignedSize = AlignUp(size, ArrowAlignment);
+ auto& page = state->CurrentArrowPages;
+
+ if (Y_UNLIKELY(!page || page->Offset + alignedSize > page->Size)) {
+ const auto pageSize = TAllocState::POOL_PAGE_SIZE;
+
+ if (state->EnableArrowTracking) {
+ state->OffloadAlloc(pageSize);
+ }
+
+ if (page) {
+ MKQLArrowFree(page, 0);
+ }
+
+ page = (TMkqlArrowHeader*)GetAlignedPage();
+ page->Offset = sizeof(TMkqlArrowHeader);
+ page->Size = pageSize;
+ page->UseCount = 1;
+
+ if (state->EnableArrowTracking) {
+ page->Entry.Link(&state->ArrowBlocksRoot);
+ Y_ENSURE(state->ArrowBuffers.insert(page).second);
+ } else {
+ page->Entry.Clear();
+ }
+ }
+
+ void* ptr = (ui8*)page + page->Offset;
+ page->Offset += alignedSize;
+ ++page->UseCount;
+
+ return ptr;
+}
+
void* MKQLArrowAllocate(ui64 size) {
+ if (size <= ArrowSizeForArena) {
+ return MKQLArrowAllocateOnArena(size);
+ }
+
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
auto fullSize = size + sizeof(TMkqlArrowHeader);
@@ -276,6 +324,9 @@ void* MKQLArrowAllocate(ui64 size) {
#endif
auto* header = (TMkqlArrowHeader*)ptr;
+ header->Offset = 0;
+ header->UseCount = 0;
+
if (state->EnableArrowTracking) {
header->Entry.Link(&state->ArrowBlocksRoot);
Y_ENSURE(state->ArrowBuffers.insert(header + 1).second);
@@ -294,7 +345,31 @@ void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size) {
return res;
}
+void MKQLArrowFreeOnArena(const void* ptr) {
+ auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(ptr);
+ if (page->UseCount.fetch_sub(1) == 1) {
+ if (!page->Entry.IsUnlinked()) {
+ TAllocState* state = TlsAllocState;
+ Y_ENSURE(state);
+ state->OffloadFree(page->Size);
+ page->Entry.Unlink();
+
+ auto it = state->ArrowBuffers.find(page);
+ Y_ENSURE(it != state->ArrowBuffers.end());
+ state->ArrowBuffers.erase(it);
+ }
+
+ ReleaseAlignedPage(page);
+ }
+
+ return;
+}
+
void MKQLArrowFree(const void* mem, ui64 size) {
+ if (size <= ArrowSizeForArena) {
+ return MKQLArrowFreeOnArena(mem);
+ }
+
auto fullSize = size + sizeof(TMkqlArrowHeader);
auto header = ((TMkqlArrowHeader*)mem) - 1;
if (!header->Entry.IsUnlinked()) {
@@ -318,19 +393,37 @@ void MKQLArrowFree(const void* mem, ui64 size) {
ReleaseAlignedPage(header, fullSize);
}
-void MKQLArrowUntrack(const void* mem) {
+void MKQLArrowUntrack(const void* mem, ui64 size) {
TAllocState* state = TlsAllocState;
Y_ENSURE(state);
if (!state->EnableArrowTracking) {
return;
}
+ if (size <= ArrowSizeForArena) {
+ auto* page = (TMkqlArrowHeader*)TAllocState::GetPageStart(mem);
+
+ auto it = state->ArrowBuffers.find(page);
+ if (it == state->ArrowBuffers.end()) {
+ return;
+ }
+
+ if (!page->Entry.IsUnlinked()) {
+ page->Entry.Unlink(); // unlink page immediately so we don't accidentally free untracked memory within `TAllocState`
+ state->ArrowBuffers.erase(it);
+ state->OffloadFree(page->Size);
+ }
+
+ return;
+ }
+
auto it = state->ArrowBuffers.find(mem);
if (it == state->ArrowBuffers.end()) {
return;
}
- auto header = ((TMkqlArrowHeader*)mem) - 1;
+ auto* header = ((TMkqlArrowHeader*)mem) - 1;
+ Y_ENSURE(header->UseCount == 0);
if (!header->Entry.IsUnlinked()) {
header->Entry.Unlink();
auto fullSize = header->Size + sizeof(TMkqlArrowHeader);
diff --git a/yql/essentials/minikql/mkql_alloc.h b/yql/essentials/minikql/mkql_alloc.h
index abcb6cc73d..24bbbb8e9e 100644
--- a/yql/essentials/minikql/mkql_alloc.h
+++ b/yql/essentials/minikql/mkql_alloc.h
@@ -41,6 +41,8 @@ constexpr ui32 MaxPageUserData = TAlignedPagePool::POOL_PAGE_SIZE - sizeof(TAllo
static_assert(sizeof(TAllocPageHeader) % MKQL_ALIGNMENT == 0, "Incorrect size of header");
+struct TMkqlArrowHeader;
+
struct TAllocState : public TAlignedPagePool
{
struct TListEntry {
@@ -90,6 +92,7 @@ struct TAllocState : public TAlignedPagePool
TListEntry GlobalPAllocList;
TListEntry* CurrentPAllocList;
TListEntry ArrowBlocksRoot;
+ TMkqlArrowHeader* CurrentArrowPages = nullptr; // page arena for small arrow allocations
std::unordered_set<const void*> ArrowBuffers;
bool EnableArrowTracking = true;
@@ -186,7 +189,9 @@ constexpr size_t ArrowAlignment = 64;
struct TMkqlArrowHeader {
TAllocState::TListEntry Entry;
ui64 Size;
- char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64)];
+ ui64 Offset;
+ std::atomic<ui64> UseCount;
+ char Padding[ArrowAlignment - sizeof(TAllocState::TListEntry) - sizeof(ui64) - sizeof(ui64) - sizeof(std::atomic<ui64>)];
};
static_assert(sizeof(TMkqlArrowHeader) == ArrowAlignment);
@@ -441,7 +446,7 @@ inline void MKQLUnregisterObject(NUdf::TBoxedValue* value) noexcept {
void* MKQLArrowAllocate(ui64 size);
void* MKQLArrowReallocate(const void* mem, ui64 prevSize, ui64 size);
void MKQLArrowFree(const void* mem, ui64 size);
-void MKQLArrowUntrack(const void* mem);
+void MKQLArrowUntrack(const void* mem, ui64 size);
template <const EMemorySubPool MemoryPoolExt = EMemorySubPool::Default>
struct TWithMiniKQLAlloc {
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index b139c24058..e03a5e3b28 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -20,6 +20,8 @@ using namespace std::string_view_literals;
namespace NKikimr {
namespace NMiniKQL {
+static_assert(RuntimeVersion >= 20);
+
namespace {
struct TDataFunctionFlags {
@@ -403,10 +405,6 @@ TRuntimeNode TProgramBuilder::Arg(TType* type) const {
}
TRuntimeNode TProgramBuilder::WideFlowArg(TType* type) const {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
TCallableBuilder builder(Env, __func__, type, true);
return TRuntimeNode(builder.Build(), false);
}
@@ -973,10 +971,6 @@ TRuntimeNode TProgramBuilder::ToList(TRuntimeNode optional) {
}
TRuntimeNode TProgramBuilder::Iterable(TZeroLambda lambda) {
- if constexpr (RuntimeVersion < 19U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto itemArg = Arg(NewNull().GetStaticType());
auto lambdaRes = lambda();
const auto resultType = NewListType(AS_TYPE(TStreamType, lambdaRes.GetStaticType())->GetItemType());
@@ -1397,9 +1391,6 @@ TRuntimeNode TProgramBuilder::Iterator(TRuntimeNode list, const TArrayRef<const
TRuntimeNode TProgramBuilder::EmptyIterator(TType* streamType) {
MKQL_ENSURE(streamType->IsStream() || streamType->IsFlow(), "Expected stream or flow.");
- if (RuntimeVersion < 7U && streamType->IsFlow()) {
- return ToFlow(EmptyIterator(NewStreamType(AS_TYPE(TFlowType, streamType)->GetItemType())));
- }
TCallableBuilder callableBuilder(Env, __func__, streamType);
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -1435,11 +1426,6 @@ TRuntimeNode TProgramBuilder::LazyList(TRuntimeNode list) {
TRuntimeNode TProgramBuilder::ForwardList(TRuntimeNode stream) {
const auto type = stream.GetStaticType();
MKQL_ENSURE(type->IsStream() || type->IsFlow(), "Expected flow or stream.");
- if constexpr (RuntimeVersion < 10U) {
- if (type->IsFlow()) {
- return ForwardList(FromFlow(stream));
- }
- }
TCallableBuilder callableBuilder(Env, __func__, NewListType(type->IsFlow() ? AS_TYPE(TFlowType, stream)->GetItemType() : AS_TYPE(TStreamType, stream)->GetItemType()));
callableBuilder.Add(stream);
return TRuntimeNode(callableBuilder.Build(), false);
@@ -2054,29 +2040,7 @@ TRuntimeNode TProgramBuilder::Lookup(TRuntimeNode dict, TRuntimeNode key) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TRuntimeNode TProgramBuilder::DictItems(TRuntimeNode dict, EDictItems mode) {
- const auto dictTypeChecked = AS_TYPE(TDictType, dict.GetStaticType());
- TType* itemType;
- switch (mode) {
- case EDictItems::Both: {
- const std::array<TType*, 2U> tupleTypes = {{ dictTypeChecked->GetKeyType(), dictTypeChecked->GetPayloadType() }};
- itemType = NewTupleType(tupleTypes);
- break;
- }
- case EDictItems::Keys: itemType = dictTypeChecked->GetKeyType(); break;
- case EDictItems::Payloads: itemType = dictTypeChecked->GetPayloadType(); break;
- }
-
- TCallableBuilder callableBuilder(Env, __func__, NewListType(itemType));
- callableBuilder.Add(dict);
- callableBuilder.Add(NewDataLiteral((ui32)mode));
- return TRuntimeNode(callableBuilder.Build(), false);
-}
-
TRuntimeNode TProgramBuilder::DictItems(TRuntimeNode dict) {
- if constexpr (RuntimeVersion < 6U) {
- return DictItems(dict, EDictItems::Both);
- }
const auto dictTypeChecked = AS_TYPE(TDictType, dict.GetStaticType());
const auto itemType = NewTupleType({ dictTypeChecked->GetKeyType(), dictTypeChecked->GetPayloadType() });
TCallableBuilder callableBuilder(Env, __func__, NewListType(itemType));
@@ -2085,9 +2049,6 @@ TRuntimeNode TProgramBuilder::DictItems(TRuntimeNode dict) {
}
TRuntimeNode TProgramBuilder::DictKeys(TRuntimeNode dict) {
- if constexpr (RuntimeVersion < 6U) {
- return DictItems(dict, EDictItems::Keys);
- }
const auto dictTypeChecked = AS_TYPE(TDictType, dict.GetStaticType());
TCallableBuilder callableBuilder(Env, __func__, NewListType(dictTypeChecked->GetKeyType()));
callableBuilder.Add(dict);
@@ -2095,9 +2056,6 @@ TRuntimeNode TProgramBuilder::DictKeys(TRuntimeNode dict) {
}
TRuntimeNode TProgramBuilder::DictPayloads(TRuntimeNode dict) {
- if constexpr (RuntimeVersion < 6U) {
- return DictItems(dict, EDictItems::Payloads);
- }
const auto dictTypeChecked = AS_TYPE(TDictType, dict.GetStaticType());
TCallableBuilder callableBuilder(Env, __func__, NewListType(dictTypeChecked->GetPayloadType()));
callableBuilder.Add(dict);
@@ -3019,10 +2977,6 @@ TRuntimeNode TProgramBuilder::SourceOf(TType* returnType) {
}
TRuntimeNode TProgramBuilder::Source() {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
TCallableBuilder callableBuilder(Env, __func__, NewFlowType(NewMultiType({})));
return TRuntimeNode(callableBuilder.Build(), false);
}
@@ -3197,9 +3151,7 @@ TRuntimeNode TProgramBuilder::QueueCreate(TRuntimeNode initCapacity, TRuntimeNod
auto resType = AS_TYPE(TResourceType, returnType);
const auto tag = resType->GetTag();
- if (initCapacity.GetStaticType()->IsVoid()) {
- MKQL_ENSURE(RuntimeVersion >= 13, "Unbounded queue is not supported in runtime version " << RuntimeVersion);
- } else {
+ if (!initCapacity.GetStaticType()->IsVoid()) {
auto initCapacityType = AS_TYPE(TDataType, initCapacity);
MKQL_ENSURE(initCapacityType->GetSchemeType() == NUdf::TDataType<ui64>::Id, "init capcity must be ui64");
}
@@ -3253,8 +3205,6 @@ TRuntimeNode TProgramBuilder::QueuePeek(TRuntimeNode resource, TRuntimeNode inde
}
TRuntimeNode TProgramBuilder::QueueRange(TRuntimeNode resource, TRuntimeNode begin, TRuntimeNode end, const TArrayRef<const TRuntimeNode>& dependentNodes, TType* returnType) {
- MKQL_ENSURE(RuntimeVersion >= 14, "QueueRange is not supported in runtime version " << RuntimeVersion);
-
MKQL_ENSURE(returnType->IsList(), "Expected list type as result of QueueRange");
auto resType = AS_TYPE(TResourceType, resource);
@@ -3291,8 +3241,6 @@ TRuntimeNode TProgramBuilder::PreserveStream(TRuntimeNode stream, TRuntimeNode q
}
TRuntimeNode TProgramBuilder::Seq(const TArrayRef<const TRuntimeNode>& args, TType* returnType) {
- MKQL_ENSURE(RuntimeVersion >= 15, "Seq is not supported in runtime version " << RuntimeVersion);
-
TCallableBuilder callableBuilder(Env, __func__, returnType);
for (auto node : args) {
callableBuilder.Add(node);
@@ -3738,16 +3686,6 @@ TRuntimeNode TProgramBuilder::BuildFlatMap(const std::string_view& callableName,
TRuntimeNode TProgramBuilder::MultiMap(TRuntimeNode list, const TExpandLambda& handler)
{
- if constexpr (RuntimeVersion < 16U) {
- const auto single = [=](TRuntimeNode item) -> TRuntimeNode {
- const auto newList = handler(item);
- const auto retItemType = newList.front().GetStaticType();
- MKQL_ENSURE(retItemType->IsSameType(*newList.back().GetStaticType()), "Must be same type.");
- return NewList(retItemType, newList);
- };
- return OrderedFlatMap(list, single);
- }
-
const auto listType = list.GetStaticType();
MKQL_ENSURE(listType->IsFlow() || listType->IsList(), "Expected flow, list, stream or optional");
@@ -3770,10 +3708,6 @@ TRuntimeNode TProgramBuilder::MultiMap(TRuntimeNode list, const TExpandLambda& h
}
TRuntimeNode TProgramBuilder::NarrowMultiMap(TRuntimeNode flow, const TWideLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -3795,10 +3729,6 @@ TRuntimeNode TProgramBuilder::NarrowMultiMap(TRuntimeNode flow, const TWideLambd
}
TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto itemType = AS_TYPE(TFlowType, flow.GetStaticType())->GetItemType();
const auto itemArg = Arg(itemType);
const auto newItems = handler(itemArg);
@@ -3815,10 +3745,6 @@ TRuntimeNode TProgramBuilder::ExpandMap(TRuntimeNode flow, const TExpandLambda&
}
TRuntimeNode TProgramBuilder::WideMap(TRuntimeNode flow, const TWideLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -3875,10 +3801,6 @@ TRuntimeNode TProgramBuilder::WideChain1Map(TRuntimeNode flow, const TWideLambda
}
TRuntimeNode TProgramBuilder::NarrowMap(TRuntimeNode flow, const TNarrowLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -3896,10 +3818,6 @@ TRuntimeNode TProgramBuilder::NarrowMap(TRuntimeNode flow, const TNarrowLambda&
}
TRuntimeNode TProgramBuilder::NarrowFlatMap(TRuntimeNode flow, const TNarrowLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -3931,10 +3849,6 @@ TRuntimeNode TProgramBuilder::NarrowFlatMap(TRuntimeNode flow, const TNarrowLamb
}
TRuntimeNode TProgramBuilder::BuildWideFilter(const std::string_view& callableName, TRuntimeNode flow, const TNarrowLambda& handler) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -4019,10 +3933,6 @@ TRuntimeNode TProgramBuilder::BuildFilter(const std::string_view& callableName,
TRuntimeNode TProgramBuilder::BuildFilter(const std::string_view& callableName, TRuntimeNode list, TRuntimeNode limit, const TUnaryLambda& handler, TType* resultType)
{
- if constexpr (RuntimeVersion < 4U) {
- return Take(BuildFilter(callableName, list, handler, resultType), limit);
- }
-
const auto listType = list.GetStaticType();
MKQL_ENSURE(listType->IsFlow() || listType->IsList() || listType->IsStream(), "Expected flow, list or stream.");
MKQL_ENSURE(limit.GetStaticType()->IsData(), "Expected data");
@@ -4322,15 +4232,12 @@ TRuntimeNode TProgramBuilder::Apply(TRuntimeNode callableNode, const TArrayRef<c
<< " with static " << arg.GetStaticType()->GetKindAsStr());
}
- TCallableBuilder callableBuilder(Env, RuntimeVersion >= 8 ? "Apply2" : "Apply", callableType->GetReturnType());
+ TCallableBuilder callableBuilder(Env, "Apply2", callableType->GetReturnType());
callableBuilder.Add(callableNode);
callableBuilder.Add(NewDataLiteral<ui32>(dependentCount));
-
- if constexpr (RuntimeVersion >= 8) {
- callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(file));
- callableBuilder.Add(NewDataLiteral(row));
- callableBuilder.Add(NewDataLiteral(column));
- }
+ callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(file));
+ callableBuilder.Add(NewDataLiteral(row));
+ callableBuilder.Add(NewDataLiteral(column));
for (const auto& arg: args) {
callableBuilder.Add(arg);
@@ -4365,12 +4272,11 @@ TRuntimeNode TProgramBuilder::Callable(TType* callableType, const TArrayLambda&
}
TRuntimeNode TProgramBuilder::NewNull() {
- if (!UseNullType || RuntimeVersion < 11) {
- TCallableBuilder callableBuilder(Env, "Null", NewOptionalType(Env.GetVoidLazy()->GetType()));
- return TRuntimeNode(callableBuilder.Build(), false);
- } else {
+ if (UseNullType) {
return TRuntimeNode(Env.GetNullLazy(), true);
}
+ TCallableBuilder callableBuilder(Env, "Null", NewOptionalType(Env.GetVoidLazy()->GetType()));
+ return TRuntimeNode(callableBuilder.Build(), false);
}
TRuntimeNode TProgramBuilder::Concat(TRuntimeNode data1, TRuntimeNode data2) {
@@ -4403,18 +4309,10 @@ TRuntimeNode TProgramBuilder::RFind(TRuntimeNode haystack, TRuntimeNode needle,
}
TRuntimeNode TProgramBuilder::StartsWith(TRuntimeNode string, TRuntimeNode prefix) {
- if constexpr (RuntimeVersion < 19U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
return DataCompare(__func__, string, prefix);
}
TRuntimeNode TProgramBuilder::EndsWith(TRuntimeNode string, TRuntimeNode suffix) {
- if constexpr (RuntimeVersion < 19U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
return DataCompare(__func__, string, suffix);
}
@@ -4746,10 +4644,6 @@ TRuntimeNode TProgramBuilder::CommonJoinCore(TRuntimeNode flow, EJoinKind joinKi
ui64 memLimit, std::optional<ui32> sortedTableOrder,
EAnyJoinSettings anyJoinSettings, const ui32 tableIndexField, TType* returnType) {
- if constexpr (RuntimeVersion < 17U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
MKQL_ENSURE(leftColumns.size() % 2U == 0U, "Expected even count");
MKQL_ENSURE(rightColumns.size() % 2U == 0U, "Expected even count");
@@ -4796,10 +4690,6 @@ TRuntimeNode TProgramBuilder::CommonJoinCore(TRuntimeNode flow, EJoinKind joinKi
}
TRuntimeNode TProgramBuilder::WideCombiner(TRuntimeNode flow, i64 memLimit, const TWideLambda& extractor, const TBinaryWideLambda& init, const TTernaryWideLambda& update, const TBinaryWideLambda& finish) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
if (memLimit < 0) {
if constexpr (RuntimeVersion < 46U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__ << " with limit " << memLimit;
@@ -4934,10 +4824,6 @@ TRuntimeNode TProgramBuilder::WideLastCombinerWithSpilling(TRuntimeNode flow, co
}
TRuntimeNode TProgramBuilder::WideCondense1(TRuntimeNode flow, const TWideLambda& init, const TWideSwitchLambda& switcher, const TBinaryWideLambda& update, bool useCtx) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs;
@@ -4983,10 +4869,6 @@ TRuntimeNode TProgramBuilder::CombineCore(TRuntimeNode stream,
const TBinaryLambda& finish,
ui64 memLimit)
{
- if constexpr (RuntimeVersion < 3U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const bool isStream = stream.GetStaticType()->IsStream();
const auto itemType = isStream ? AS_TYPE(TStreamType, stream)->GetItemType() : AS_TYPE(TFlowType, stream)->GetItemType();
@@ -5037,10 +4919,6 @@ TRuntimeNode TProgramBuilder::GroupingCore(TRuntimeNode stream,
const TUnaryLambda& keyExtractor,
const TUnaryLambda& handler)
{
- if (handler && RuntimeVersion < 20U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__ << " with handler";
- }
-
auto itemType = AS_TYPE(TStreamType, stream)->GetItemType();
TRuntimeNode keyExtractorItemArg = Arg(itemType);
@@ -5084,13 +4962,6 @@ TRuntimeNode TProgramBuilder::Chopper(TRuntimeNode flow, const TUnaryLambda& key
const auto flowType = flow.GetStaticType();
MKQL_ENSURE(flowType->IsFlow() || flowType->IsStream(), "Expected flow or stream.");
-
- if constexpr (RuntimeVersion < 9U) {
- return FlatMap(GroupingCore(flow, groupSwitch, keyExtractor),
- [&](TRuntimeNode item) -> TRuntimeNode { return groupHandler(Nth(item, 0U), Nth(item, 1U)); }
- );
- }
-
const bool isStream = flowType->IsStream();
const auto itemType = isStream ? AS_TYPE(TStreamType, flow)->GetItemType() : AS_TYPE(TFlowType, flow)->GetItemType();
@@ -5119,10 +4990,6 @@ TRuntimeNode TProgramBuilder::Chopper(TRuntimeNode flow, const TUnaryLambda& key
TRuntimeNode TProgramBuilder::WideChopper(TRuntimeNode flow, const TWideLambda& extractor, const TWideSwitchLambda& groupSwitch,
const std::function<TRuntimeNode (TRuntimeNode::TList, TRuntimeNode)>& groupHandler
) {
- if constexpr (RuntimeVersion < 18U) {
- THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
- }
-
const auto wideComponents = GetWideComponents(AS_TYPE(TFlowType, flow.GetStaticType()));
TRuntimeNode::TList itemArgs, keyArgs;
@@ -5564,11 +5431,9 @@ TRuntimeNode TProgramBuilder::Replicate(TRuntimeNode item, TRuntimeNode count, c
TCallableBuilder callableBuilder(Env, __func__, listType);
callableBuilder.Add(item);
callableBuilder.Add(count);
- if constexpr (RuntimeVersion >= 2) {
- callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(file));
- callableBuilder.Add(NewDataLiteral(row));
- callableBuilder.Add(NewDataLiteral(column));
- }
+ callableBuilder.Add(NewDataLiteral<NUdf::EDataSlot::String>(file));
+ callableBuilder.Add(NewDataLiteral(row));
+ callableBuilder.Add(NewDataLiteral(column));
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp
index d1df31a97d..231231eee7 100644
--- a/yql/essentials/minikql/mkql_type_builder.cpp
+++ b/yql/essentials/minikql/mkql_type_builder.cpp
@@ -1522,6 +1522,17 @@ bool ConvertArrowTypeImpl(NUdf::EDataSlot slot, std::shared_ptr<arrow::DataType>
}
}
+inline bool IsSingularType(const TType* type) {
+ return type->IsNull() ||
+ type->IsVoid() ||
+ type->IsEmptyDict() ||
+ type->IsEmptyList();
+}
+
+inline bool NeedWrapWithExternalOptional(const TType* type) {
+ return type->IsPg() || IsSingularType(type);
+}
+
bool ConvertArrowTypeImpl(TType* itemType, std::shared_ptr<arrow::DataType>& type, const TArrowConvertFailedCallback& onFail, bool output) {
bool isOptional;
auto unpacked = UnpackOptional(itemType, isOptional);
@@ -1534,8 +1545,7 @@ bool ConvertArrowTypeImpl(TType* itemType, std::shared_ptr<arrow::DataType>& typ
return false;
}
- if (unpacked->IsOptional() || isOptional && unpacked->IsPg()) {
- // at least 2 levels of optionals
+ if (unpacked->IsOptional() || isOptional && NeedWrapWithExternalOptional(unpacked)) {
ui32 nestLevel = 0;
auto currentType = itemType;
auto previousType = itemType;
@@ -1545,12 +1555,11 @@ bool ConvertArrowTypeImpl(TType* itemType, std::shared_ptr<arrow::DataType>& typ
currentType = AS_TYPE(TOptionalType, currentType)->GetItemType();
} while (currentType->IsOptional());
- if (currentType->IsPg()) {
+ if (NeedWrapWithExternalOptional(currentType)) {
previousType = currentType;
++nestLevel;
}
- // previousType is always Optional
std::shared_ptr<arrow::DataType> innerArrowType;
if (!ConvertArrowTypeImpl(previousType, innerArrowType, onFail, output)) {
return false;
@@ -1618,6 +1627,11 @@ bool ConvertArrowTypeImpl(TType* itemType, std::shared_ptr<arrow::DataType>& typ
return true;
}
+ if (IsSingularType(unpacked)) {
+ type = arrow::null();
+ return true;
+ }
+
if (!unpacked->IsData()) {
if (onFail) {
onFail(unpacked);
@@ -2479,6 +2493,10 @@ size_t CalcMaxBlockItemSize(const TType* type) {
return sizeof(NYql::NUdf::TUnboxedValue);
}
+ if (IsSingularType(type)) {
+ return 0;
+ }
+
if (type->IsData()) {
auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
switch (slot) {
@@ -2552,6 +2570,7 @@ struct TComparatorTraits {
using TExtOptional = NUdf::TExternalOptionalBlockItemComparator;
template <typename T, bool Nullable>
using TTzDateComparator = NUdf::TTzDateBlockItemComparator<T, Nullable>;
+ using TSingularType = NUdf::TSingularTypeBlockItemComparator;
constexpr static bool PassType = false;
@@ -2565,6 +2584,10 @@ struct TComparatorTraits {
ythrow yexception() << "Comparator not implemented for block resources: ";
}
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
+
template<typename TTzDate>
static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
if (isOptional) {
@@ -2586,6 +2609,7 @@ struct THasherTraits {
using TExtOptional = NUdf::TExternalOptionalBlockItemHasher;
template <typename T, bool Nullable>
using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher<T, Nullable>;
+ using TSingularType = NUdf::TSingularTypeBlockItemHaser;
constexpr static bool PassType = false;
@@ -2607,6 +2631,10 @@ struct THasherTraits {
return std::make_unique<TTzDateHasher<TTzDate, false>>();
}
}
+
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
};
NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const {
@@ -2622,12 +2650,11 @@ TType* TTypeBuilder::NewVoidType() const {
}
TType* TTypeBuilder::NewNullType() const {
- if (!UseNullType || RuntimeVersion < 11) {
- TCallableBuilder callableBuilder(Env, "Null", NewOptionalType(NewVoidType()));
- return TRuntimeNode(callableBuilder.Build(), false).GetStaticType();
- } else {
+ if (UseNullType) {
return TRuntimeNode(Env.GetNullLazy(), true).GetStaticType();
}
+ TCallableBuilder callableBuilder(Env, "Null", NewOptionalType(NewVoidType()));
+ return TRuntimeNode(callableBuilder.Build(), false).GetStaticType();
}
TType* TTypeBuilder::NewEmptyStructType() const {
diff --git a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
index 123b59cc04..983bf85542 100644
--- a/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
+++ b/yql/essentials/providers/common/mkql/yql_provider_mkql.cpp
@@ -2236,22 +2236,12 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
AddCallable("EmptyList", [](const TExprNode& node, TMkqlBuildContext& ctx) {
Y_UNUSED(node);
- if (RuntimeVersion < 11) {
- return ctx.ProgramBuilder.NewEmptyList(ctx.ProgramBuilder.NewVoid().GetStaticType());
- } else {
- return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyListLazy(), true);
- }
+ return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyListLazy(), true);
});
AddCallable("EmptyDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
Y_UNUSED(node);
- if (RuntimeVersion < 11) {
- auto voidType = ctx.ProgramBuilder.NewVoid().GetStaticType();
- auto dictType = ctx.ProgramBuilder.NewDictType(voidType, voidType, false);
- return ctx.ProgramBuilder.NewDict(dictType, {});
- } else {
- return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDictLazy(), true);
- }
+ return TRuntimeNode(ctx.ProgramBuilder.GetTypeEnvironment().GetEmptyDictLazy(), true);
});
AddCallable("SourceOf", [](const TExprNode& node, TMkqlBuildContext& ctx) {
diff --git a/yql/essentials/providers/common/mkql/yql_type_mkql.cpp b/yql/essentials/providers/common/mkql/yql_type_mkql.cpp
index cb001c847b..372ba7a98e 100644
--- a/yql/essentials/providers/common/mkql/yql_type_mkql.cpp
+++ b/yql/essentials/providers/common/mkql/yql_type_mkql.cpp
@@ -184,20 +184,10 @@ NKikimr::NMiniKQL::TType* BuildTypeImpl(const TTypeAnnotationNode& annotation, c
}
case ETypeAnnotationKind::EmptyList: {
- if (NKikimr::NMiniKQL::RuntimeVersion < 11) {
- auto voidType = typeBuilder.NewVoidType();
- return typeBuilder.NewListType(voidType);
- }
-
return typeBuilder.GetTypeEnvironment().GetTypeOfEmptyListLazy();
}
case ETypeAnnotationKind::EmptyDict: {
- if constexpr(NKikimr::NMiniKQL::RuntimeVersion < 11) {
- auto voidType = typeBuilder.NewVoidType();
- return typeBuilder.NewDictType(voidType, voidType, false);
- }
-
return typeBuilder.GetTypeEnvironment().GetTypeOfEmptyDictLazy();
}
diff --git a/yql/essentials/providers/common/schema/mkql/yql_mkql_schema.cpp b/yql/essentials/providers/common/schema/mkql/yql_mkql_schema.cpp
index c172a0a09f..8273070925 100644
--- a/yql/essentials/providers/common/schema/mkql/yql_mkql_schema.cpp
+++ b/yql/essentials/providers/common/schema/mkql/yql_mkql_schema.cpp
@@ -192,17 +192,9 @@ struct TRuntimeTypeLoader {
return Builder.GetTypeEnvironment().GetTypeOfTypeLazy();
}
TMaybe<TType> LoadEmptyListType(ui32 /*level*/) {
- if (NKikimr::NMiniKQL::RuntimeVersion < 11) {
- return Builder.NewListType(Builder.NewVoid().GetStaticType());
- }
-
return Builder.GetTypeEnvironment().GetTypeOfEmptyListLazy();
}
TMaybe<TType> LoadEmptyDictType(ui32 /*level*/) {
- if (NKikimr::NMiniKQL::RuntimeVersion < 11) {
- return Builder.NewDictType(Builder.NewVoid().GetStaticType(), Builder.NewVoid().GetStaticType(), false);
- }
-
return Builder.GetTypeEnvironment().GetTypeOfEmptyDictLazy();
}
TMaybe<TType> LoadDataType(const TString& dataType, ui32 /*level*/) {
diff --git a/yql/essentials/public/issue/yql_warning.cpp b/yql/essentials/public/issue/yql_warning.cpp
index 881e63e95f..6364025ff3 100644
--- a/yql/essentials/public/issue/yql_warning.cpp
+++ b/yql/essentials/public/issue/yql_warning.cpp
@@ -31,6 +31,10 @@ TWarningRule::EParseResult TWarningRule::ParseFrom(const TString& codePattern, c
return EParseResult::PARSE_OK;
}
+TWarningPolicy::TWarningPolicy(bool isReplay)
+ : IsReplay(isReplay)
+{}
+
void TWarningPolicy::AddRule(const TWarningRule& rule)
{
TString pattern = rule.GetPattern();
@@ -38,6 +42,10 @@ void TWarningPolicy::AddRule(const TWarningRule& rule)
return;
}
+ if (pattern == "*" && IsReplay) {
+ return;
+ }
+
Rules.push_back(rule);
EWarningAction action = rule.GetAction();
diff --git a/yql/essentials/public/issue/yql_warning.h b/yql/essentials/public/issue/yql_warning.h
index d1d6a90922..7c3939d1d3 100644
--- a/yql/essentials/public/issue/yql_warning.h
+++ b/yql/essentials/public/issue/yql_warning.h
@@ -34,6 +34,8 @@ using TWarningRules = TVector<TWarningRule>;
class TWarningPolicy {
public:
+ TWarningPolicy(bool isReplay = false);
+
void AddRule(const TWarningRule& rule);
EWarningAction GetAction(TIssueCode code) const;
@@ -43,6 +45,7 @@ public:
void Clear();
private:
+ const bool IsReplay;
TWarningRules Rules;
EWarningAction BaseAction = EWarningAction::DEFAULT;
THashMap<TIssueCode, EWarningAction> Overrides;
diff --git a/yql/essentials/public/udf/arrow/block_builder.h b/yql/essentials/public/udf/arrow/block_builder.h
index 92f4f7e123..baac1842b9 100644
--- a/yql/essentials/public/udf/arrow/block_builder.h
+++ b/yql/essentials/public/udf/arrow/block_builder.h
@@ -10,6 +10,7 @@
#include <yql/essentials/public/udf/udf_value_builder.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <arrow/array/array_base.h>
#include <arrow/datum.h>
#include <arrow/c/bridge.h>
@@ -1358,6 +1359,53 @@ private:
std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
};
+class TSingularBlockBuilder final: public TArrayBuilderBase {
+public:
+ TSingularBlockBuilder(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool,
+ size_t maxLen, const TParams& params = {})
+ : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen, params) {
+ Reserve();
+ }
+
+ void DoAdd(NUdf::TUnboxedValuePod value) final {
+ Y_UNUSED(value);
+ }
+
+ void DoAdd(TBlockItem value) final {
+ Y_UNUSED(value);
+ }
+
+ void DoAdd(TInputBuffer& input) final {
+ Y_UNUSED(input.PopChar());
+ }
+
+ void DoAddDefault() final {}
+
+ void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
+ Y_UNUSED(array, sparseBitmap, popCount);
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
+ Y_UNUSED(array, beginIndex, count);
+ }
+
+ void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
+ Y_UNUSED(array, indexes, count);
+ }
+
+ TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
+ TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
+ Y_UNUSED(finish);
+ result->Payload.push_back(arrow::NullArray(GetCurrLen()).data());
+ return result;
+ }
+
+private:
+ size_t DoReserve() final {
+ return 0;
+ }
+};
+
using TArrayBuilderParams = TArrayBuilderBase::TParams;
struct TBuilderTraits {
@@ -1373,6 +1421,7 @@ struct TBuilderTraits {
using TResource = TResourceArrayBuilder<Nullable>;
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateArrayBuilder<TTzDate, Nullable>;
+ using TSingular = TSingularBlockBuilder;
constexpr static bool PassType = true;
@@ -1412,6 +1461,10 @@ struct TBuilderTraits {
return std::make_unique<TTzDateReader<TTzDate, false>>(type, typeInfoHelper, pool, maxLen, params);
}
}
+
+ static std::unique_ptr<TResult> MakeSingular(const TType* type, const ITypeInfoHelper& typeInfoHelper, arrow::MemoryPool& pool, size_t maxLen, const TArrayBuilderParams& params) {
+ return std::make_unique<TSingular>(type, typeInfoHelper, pool, maxLen, params);
+ }
};
inline std::unique_ptr<IArrayBuilder> MakeArrayBuilder(
diff --git a/yql/essentials/public/udf/arrow/block_item.h b/yql/essentials/public/udf/arrow/block_item.h
index 2f9784cd3c..79686b3094 100644
--- a/yql/essentials/public/udf/arrow/block_item.h
+++ b/yql/essentials/public/udf/arrow/block_item.h
@@ -166,6 +166,18 @@ public:
return &Raw;
}
+ static inline TBlockItem Void() {
+ TBlockItem v;
+ v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded);
+ return v;
+ }
+
+ static inline TBlockItem Zero() {
+ TBlockItem v;
+ v.Raw.Simple.Meta = static_cast<ui8>(EMarkers::Embedded);
+ return v;
+ }
+
inline const void* GetRawPtr() const
{
return &Raw;
diff --git a/yql/essentials/public/udf/arrow/block_item_comparator.h b/yql/essentials/public/udf/arrow/block_item_comparator.h
index e185b63f66..ad803799c6 100644
--- a/yql/essentials/public/udf/arrow/block_item_comparator.h
+++ b/yql/essentials/public/udf/arrow/block_item_comparator.h
@@ -169,6 +169,24 @@ public:
}
};
+class TSingularTypeBlockItemComparator: public TBlockItemComparatorBase<TSingularTypeBlockItemComparator, /*Nullable=*/false> {
+public:
+ i64 DoCompare(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return 0;
+ }
+
+ bool DoEquals(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return true;
+ }
+
+ bool DoLess(TBlockItem lhs, TBlockItem rhs) const {
+ Y_UNUSED(lhs, rhs);
+ return false;
+ }
+};
+
template<typename TTzType, bool Nullable>
class TTzDateBlockItemComparator : public TBlockItemComparatorBase<TTzDateBlockItemComparator<TTzType, Nullable>, Nullable> {
using TLayout = typename TDataType<TTzType>::TLayout;
diff --git a/yql/essentials/public/udf/arrow/block_item_hasher.h b/yql/essentials/public/udf/arrow/block_item_hasher.h
index 3f77e27b6f..9108d7b06e 100644
--- a/yql/essentials/public/udf/arrow/block_item_hasher.h
+++ b/yql/essentials/public/udf/arrow/block_item_hasher.h
@@ -76,6 +76,14 @@ public:
}
};
+class TSingularTypeBlockItemHaser : public TBlockItemHasherBase<TSingularTypeBlockItemHaser, /*Nullable=*/false> {
+public:
+ ui64 DoHash(TBlockItem value) const {
+ Y_UNUSED(value);
+ return 0;
+ }
+};
+
template <bool Nullable>
class TTupleBlockItemHasher : public TBlockItemHasherBase<TTupleBlockItemHasher<Nullable>, Nullable> {
public:
diff --git a/yql/essentials/public/udf/arrow/block_reader.h b/yql/essentials/public/udf/arrow/block_reader.h
index 05dd3ce440..6652df2ac6 100644
--- a/yql/essentials/public/udf/arrow/block_reader.h
+++ b/yql/essentials/public/udf/arrow/block_reader.h
@@ -424,6 +424,48 @@ private:
TFixedSizeBlockReader<ui16, /* Nullable */false> TimezoneReader_;
};
+// NOTE: For any singular type we use arrow::null() data type.
+// This data type DOES NOT support bit mask so for optional type
+// we have to use |TExternalOptional| wrapper.
+class TSingularTypeBlockReader: public IBlockReader {
+public:
+ TSingularTypeBlockReader() = default;
+
+ ~TSingularTypeBlockReader() override = default;
+
+ TBlockItem GetItem(const arrow::ArrayData& data, size_t index) override {
+ Y_UNUSED(data, index);
+ return TBlockItem::Zero();
+ }
+
+ TBlockItem GetScalarItem(const arrow::Scalar& scalar) override {
+ Y_UNUSED(scalar);
+ return TBlockItem::Zero();
+ }
+
+ ui64 GetDataWeight(const arrow::ArrayData& data) const override {
+ Y_UNUSED(data);
+ return 0;
+ }
+
+ ui64 GetDataWeight(TBlockItem item) const override {
+ Y_UNUSED(item);
+ return 0;
+ }
+
+ ui64 GetDefaultValueWeight() const override {
+ return 0;
+ }
+
+ void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const override {
+ Y_UNUSED(index, data, out);
+ }
+
+ void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const override {
+ Y_UNUSED(scalar, out);
+ }
+};
+
class TExternalOptionalBlockReader final : public IBlockReader {
public:
TExternalOptionalBlockReader(std::unique_ptr<IBlockReader>&& inner)
@@ -498,6 +540,7 @@ struct TReaderTraits {
using TResource = TResourceBlockReader<Nullable>;
template<typename TTzDate, bool Nullable>
using TTzDateReader = TTzDateBlockReader<TTzDate, Nullable>;
+ using TSingularType = TSingularTypeBlockReader;
constexpr static bool PassType = false;
@@ -518,6 +561,10 @@ struct TReaderTraits {
}
}
+ static std::unique_ptr<TResult> MakeSingular() {
+ return std::make_unique<TSingularType>();
+ }
+
template<typename TTzDate>
static std::unique_ptr<TResult> MakeTzDate(bool isOptional) {
if (isOptional) {
@@ -595,6 +642,10 @@ inline void UpdateBlockItemSerializeProps(const ITypeInfoHelper& typeInfoHelper,
return;
}
+ if (IsSingularType(typeInfoHelper, type)) {
+ return;
+ }
+
Y_ENSURE(false, "Unsupported type");
}
diff --git a/yql/essentials/public/udf/arrow/dispatch_traits.h b/yql/essentials/public/udf/arrow/dispatch_traits.h
index 88c303cc87..87c25b93f5 100644
--- a/yql/essentials/public/udf/arrow/dispatch_traits.h
+++ b/yql/essentials/public/udf/arrow/dispatch_traits.h
@@ -1,5 +1,6 @@
#pragma once
+#include <yql/essentials/public/udf/arrow/util.h>
#include <yql/essentials/public/udf/udf_type_inspection.h>
#include <yql/essentials/public/udf/udf_value_builder.h>
@@ -85,8 +86,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
TOptionalTypeInspector unpackedOpt(typeInfoHelper, unpacked);
TPgTypeInspector unpackedPg(typeInfoHelper, unpacked);
- if (unpackedOpt || typeOpt && unpackedPg) {
- // at least 2 levels of optionals
+ if (unpackedOpt || (typeOpt && NeedWrapWithExternalOptional(typeInfoHelper, unpacked))) {
ui32 nestLevel = 0;
auto currentType = type;
auto previousType = type;
@@ -103,7 +103,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
}
- if (TPgTypeInspector(typeInfoHelper, currentType)) {
+ if (NeedWrapWithExternalOptional(typeInfoHelper, currentType)) {
previousType = currentType;
++nestLevel;
}
@@ -118,8 +118,7 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
return reader;
- }
- else {
+ } else {
type = unpacked;
}
@@ -230,6 +229,15 @@ std::unique_ptr<typename TTraits::TResult> DispatchByArrowTraits(const ITypeInfo
}
}
+ if (IsSingularType(typeInfoHelper, type)) {
+ Y_ENSURE(!isOptional, "Optional data types are not supported directly for singular type. Please use TExternalOptional wrapper.");
+ if constexpr (TTraits::PassType) {
+ return TTraits::MakeSingular(type, std::forward<TArgs>(args)...);
+ } else {
+ return TTraits::MakeSingular(std::forward<TArgs>(args)...);
+ }
+ }
+
Y_ENSURE(false, "Unsupported type");
}
diff --git a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
index bbb4c134c8..d0851c5e86 100644
--- a/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
+++ b/yql/essentials/public/udf/arrow/ut/array_builder_ut.cpp
@@ -220,6 +220,46 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) {
UNIT_ASSERT_VALUES_EQUAL(item2AfterRead.GetStringRefFromValue(), "234");
}
+ Y_UNIT_TEST(TestSingularTypeValueBuilderReader) {
+ TArrayBuilderTestData data;
+ const auto nullType = data.PgmBuilder.NewNullType();
+
+ std::shared_ptr<arrow::ArrayData> arrayData = arrow::NullArray{42}.data();
+ IArrayBuilder::TArrayDataItem arrayDataItem = {.Data = arrayData.get(), .StartOffset = 0};
+ {
+ const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), nullType, *data.ArrowPool, MAX_BLOCK_SIZE, /*pgBuilder=*/nullptr);
+ // Check builder.
+ arrayBuilder->Add(TUnboxedValuePod::Zero());
+ arrayBuilder->Add(TBlockItem::Zero());
+ arrayBuilder->Add(TBlockItem::Zero(), 4);
+ TInputBuffer inputBuffer("Just arbitrary string");
+ arrayBuilder->Add(inputBuffer);
+ arrayBuilder->AddMany(*arrayData, /*popCount=*/3u, /*sparseBitmat=*/nullptr, /*bitmapSize=*/arrayData->length);
+ arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/1, /*count=*/3u);
+ std::vector<ui64> indexes = {1, 5, 7, 10};
+ arrayBuilder->AddMany(&arrayDataItem, /*arrayCount=*/1, /*beginIndex=*/indexes.data(), /*count=*/4u);
+ UNIT_ASSERT_VALUES_EQUAL(arrayBuilder->Build(true).array()->length, 1 + 1 + 4 + 1 + 3 + 3 + 4);
+ }
+
+ {
+ // Check reader.
+ const auto blockReader = MakeBlockReader(NMiniKQL::TTypeInfoHelper(), nullType);
+
+ UNIT_ASSERT(blockReader->GetItem(*arrayData, 0));
+ UNIT_ASSERT(blockReader->GetScalarItem(arrow::Scalar(arrow::null())));
+ UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(*arrayData), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDataWeight(TBlockItem::Zero()), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0);
+ UNIT_ASSERT_EQUAL(blockReader->GetDefaultValueWeight(), 0);
+
+ TOutputBuffer outputBuffer;
+ blockReader->SaveItem(*arrayData, 1, outputBuffer);
+ UNIT_ASSERT(outputBuffer.Finish().empty());
+ blockReader->SaveScalarItem(arrow::Scalar(arrow::null()), outputBuffer);
+ UNIT_ASSERT(outputBuffer.Finish().empty());
+ }
+ }
+
Y_UNIT_TEST(TestBuilderAllocatedSize) {
TArrayBuilderTestData data;
const auto optStringType = data.PgmBuilder.NewDataType(NUdf::EDataSlot::String, true);
diff --git a/yql/essentials/public/udf/arrow/util.h b/yql/essentials/public/udf/arrow/util.h
index f7bdb715f9..e899af26af 100644
--- a/yql/essentials/public/udf/arrow/util.h
+++ b/yql/essentials/public/udf/arrow/util.h
@@ -12,6 +12,9 @@
#include <functional>
+#include <yql/essentials/public/udf/udf_type_inspection.h>
+#include <yql/essentials/public/udf/udf_types.h>
+
namespace NYql {
namespace NUdf {
@@ -236,5 +239,17 @@ inline void ZeroMemoryContext(void* ptr) {
SetMemoryContext(ptr, nullptr);
}
+inline bool IsSingularType(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ auto kind = typeInfoHelper.GetTypeKind(type);
+ return kind == ETypeKind::Null ||
+ kind == ETypeKind::Void ||
+ kind == ETypeKind::EmptyDict ||
+ kind == ETypeKind::EmptyList;
+}
+
+inline bool NeedWrapWithExternalOptional(const ITypeInfoHelper& typeInfoHelper, const TType* type) {
+ return TPgTypeInspector(typeInfoHelper, type) || IsSingularType(typeInfoHelper, type);
+}
+
} // namespace NUdf
} // namespace NYql
diff --git a/yql/essentials/sql/settings/translation_settings.h b/yql/essentials/sql/settings/translation_settings.h
index 8c6e716343..98d537038e 100644
--- a/yql/essentials/sql/settings/translation_settings.h
+++ b/yql/essentials/sql/settings/translation_settings.h
@@ -129,6 +129,7 @@ namespace NSQLTranslation {
NYql::IAutoParamBuilderFactory* AutoParamBuilderFactory = nullptr;
bool EmitReadsForExists = false;
bool AlwaysAllowExports = false;
+ bool IsReplay = false;
};
bool ParseTranslationSettings(const TString& query, NSQLTranslation::TTranslationSettings& settings, NYql::TIssues& issues);
diff --git a/yql/essentials/sql/v1/context.cpp b/yql/essentials/sql/v1/context.cpp
index 9790e931dc..b72c673b0e 100644
--- a/yql/essentials/sql/v1/context.cpp
+++ b/yql/essentials/sql/v1/context.cpp
@@ -100,6 +100,7 @@ TContext::TContext(const TLexers& lexers, const TParsers& parsers,
, HasPendingErrors(false)
, DqEngineEnable(Settings.DqDefaultAuto->Allow())
, AnsiQuotedIdentifiers(settings.AnsiLexer)
+ , WarningPolicy(settings.IsReplay)
, BlockEngineEnable(Settings.BlockDefaultAuto->Allow())
{
for (auto lib : settings.Libraries) {
diff --git a/yql/essentials/tests/sql/minirun/part0/canondata/result.json b/yql/essentials/tests/sql/minirun/part0/canondata/result.json
index 3c1aa86fec..ffeebb57cc 100644
--- a/yql/essentials/tests/sql/minirun/part0/canondata/result.json
+++ b/yql/essentials/tests/sql/minirun/part0/canondata/result.json
@@ -275,6 +275,27 @@
"uri": "https://{canondata_backend}/1936842/8073eb626dd657fcbe20d34185c363a1a18c3e7c/resource.tar.gz#test.test_blocks-agg_all_mixed_distinct-default.txt-Results_/results.txt"
}
],
+ "test.test[blocks-agg_singular_type_key_optional-default.txt-Debug]": [
+ {
+ "checksum": "71ee94512d6ef28833fb6df3bace7b53",
+ "size": 2727,
+ "uri": "https://{canondata_backend}/1925842/7e03c084910acb6d9d50a1f7dc65eda3cdac3b45/resource.tar.gz#test.test_blocks-agg_singular_type_key_optional-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_key_optional-default.txt-Peephole]": [
+ {
+ "checksum": "db2e4bd6530b31b6efceb77a4a184b4e",
+ "size": 6606,
+ "uri": "https://{canondata_backend}/1925842/7e03c084910acb6d9d50a1f7dc65eda3cdac3b45/resource.tar.gz#test.test_blocks-agg_singular_type_key_optional-default.txt-Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_key_optional-default.txt-Results]": [
+ {
+ "checksum": "4b79ad0d41612ad09d735f34513ee6ff",
+ "size": 7301,
+ "uri": "https://{canondata_backend}/1925842/7e03c084910acb6d9d50a1f7dc65eda3cdac3b45/resource.tar.gz#test.test_blocks-agg_singular_type_key_optional-default.txt-Results_/results.txt"
+ }
+ ],
"test.test[blocks-and-default.txt-Debug]": [
{
"checksum": "47525fa40526e04498f0c41e6bc48f59",
diff --git a/yql/essentials/tests/sql/minirun/part2/canondata/result.json b/yql/essentials/tests/sql/minirun/part2/canondata/result.json
index 48c0652311..73ff7dcd87 100644
--- a/yql/essentials/tests/sql/minirun/part2/canondata/result.json
+++ b/yql/essentials/tests/sql/minirun/part2/canondata/result.json
@@ -258,6 +258,27 @@
"uri": "https://{canondata_backend}/1937150/3d01c6ab2777fc3b99338655d39a5bcbb1ac89c3/resource.tar.gz#test.test_blocks-agg_by_key_only_distinct-default.txt-Results_/results.txt"
}
],
+ "test.test[blocks-agg_singular_type_value_optional-default.txt-Debug]": [
+ {
+ "checksum": "06774e6dab64198fc6cc5d173b0bba26",
+ "size": 2781,
+ "uri": "https://{canondata_backend}/1781765/b8d92d6ccf46e436b2e5b3b70ab511bab6d820b0/resource.tar.gz#test.test_blocks-agg_singular_type_value_optional-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_value_optional-default.txt-Peephole]": [
+ {
+ "checksum": "d22ed37889eea3b41eadb6164bf6d017",
+ "size": 3113,
+ "uri": "https://{canondata_backend}/1781765/b8d92d6ccf46e436b2e5b3b70ab511bab6d820b0/resource.tar.gz#test.test_blocks-agg_singular_type_value_optional-default.txt-Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_value_optional-default.txt-Results]": [
+ {
+ "checksum": "98fbeb83e5295954045efa6fd159626f",
+ "size": 5301,
+ "uri": "https://{canondata_backend}/1781765/b8d92d6ccf46e436b2e5b3b70ab511bab6d820b0/resource.tar.gz#test.test_blocks-agg_singular_type_value_optional-default.txt-Results_/results.txt"
+ }
+ ],
"test.test[blocks-exists-default.txt-Debug]": [
{
"checksum": "a871029504a6d3f1c07342493b86d28d",
diff --git a/yql/essentials/tests/sql/minirun/part7/canondata/result.json b/yql/essentials/tests/sql/minirun/part7/canondata/result.json
index 399f6d226b..d018f1582d 100644
--- a/yql/essentials/tests/sql/minirun/part7/canondata/result.json
+++ b/yql/essentials/tests/sql/minirun/part7/canondata/result.json
@@ -220,7 +220,7 @@
{
"checksum": "02e80809d3cbf91101d09d4ac1e87aa0",
"size": 623,
- "uri": "https://{canondata_backend}/1917492/b01930df0710eb10e4ce2d35cddca6be33ac8a9f/resource.tar.gz#test.test_blocks-as_tuple-default.txt-Peephole_/opt.yql"
+ "uri": "https://{canondata_backend}/1130705/f9eb075ce8fc54a57832e4ee918669601325c133/resource.tar.gz#test.test_blocks-as_tuple-default.txt-Peephole_/opt.yql"
}
],
"test.test[blocks-as_tuple-default.txt-Results]": [
diff --git a/yql/essentials/tests/sql/minirun/part8/canondata/result.json b/yql/essentials/tests/sql/minirun/part8/canondata/result.json
index 6d3cbc281d..94d8c6547e 100644
--- a/yql/essentials/tests/sql/minirun/part8/canondata/result.json
+++ b/yql/essentials/tests/sql/minirun/part8/canondata/result.json
@@ -352,6 +352,48 @@
"uri": "https://{canondata_backend}/1031349/4d0c6ce1905689c65e264d15d770d36efcd9426f/resource.tar.gz#test.test_binding-named_expr_input-default.txt-Results_/results.txt"
}
],
+ "test.test[blocks-agg_singular_type_key-default.txt-Debug]": [
+ {
+ "checksum": "b97d36400fafea8f8f6670954d7ac139",
+ "size": 2685,
+ "uri": "https://{canondata_backend}/1936273/07450a3416f3c728f9a8a8fdde6e5f5a0ca2d9a6/resource.tar.gz#test.test_blocks-agg_singular_type_key-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_key-default.txt-Peephole]": [
+ {
+ "checksum": "f77f6136a2995c5b0e0ae3ed00274d36",
+ "size": 6564,
+ "uri": "https://{canondata_backend}/1936273/07450a3416f3c728f9a8a8fdde6e5f5a0ca2d9a6/resource.tar.gz#test.test_blocks-agg_singular_type_key-default.txt-Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_key-default.txt-Results]": [
+ {
+ "checksum": "e2233558149bd3009f7f16412bf4838a",
+ "size": 6117,
+ "uri": "https://{canondata_backend}/1936273/07450a3416f3c728f9a8a8fdde6e5f5a0ca2d9a6/resource.tar.gz#test.test_blocks-agg_singular_type_key-default.txt-Results_/results.txt"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_value-default.txt-Debug]": [
+ {
+ "checksum": "4733abf71c9c62e30af77c6490d59334",
+ "size": 2358,
+ "uri": "https://{canondata_backend}/1130705/a25045513209436069d9f9a29831b732c13e1675/resource.tar.gz#test.test_blocks-agg_singular_type_value-default.txt-Debug_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_value-default.txt-Peephole]": [
+ {
+ "checksum": "b34c4e8ca42e6232ef03f2ffeb05fd83",
+ "size": 3013,
+ "uri": "https://{canondata_backend}/1130705/a25045513209436069d9f9a29831b732c13e1675/resource.tar.gz#test.test_blocks-agg_singular_type_value-default.txt-Peephole_/opt.yql"
+ }
+ ],
+ "test.test[blocks-agg_singular_type_value-default.txt-Results]": [
+ {
+ "checksum": "4718805e72274809e4ac6c07ee8dfd7d",
+ "size": 3489,
+ "uri": "https://{canondata_backend}/1130705/a25045513209436069d9f9a29831b732c13e1675/resource.tar.gz#test.test_blocks-agg_singular_type_value-default.txt-Results_/results.txt"
+ }
+ ],
"test.test[blocks-and_scalar-default.txt-Debug]": [
{
"checksum": "e5ccc5c53756e09ded8e82b6d662e5e9",
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/result.json b/yql/essentials/tests/sql/sql2yql/canondata/result.json
index f345b0e389..0c9c63e161 100644
--- a/yql/essentials/tests/sql/sql2yql/canondata/result.json
+++ b/yql/essentials/tests/sql/sql2yql/canondata/result.json
@@ -1378,6 +1378,34 @@
"uri": "https://{canondata_backend}/1917492/7dd4bc86433f6173a26b62397e1ef41fa9471945/resource.tar.gz#test_sql2yql.test_blocks-agg_by_key_only_distinct_/sql.yql"
}
],
+ "test_sql2yql.test[blocks-agg_singular_type_key]": [
+ {
+ "checksum": "7cae7f556775597a0b451a875e77a1df",
+ "size": 7636,
+ "uri": "https://{canondata_backend}/1784117/5ff6ff6c0808bf39612567f492af1bc2db36da20/resource.tar.gz#test_sql2yql.test_blocks-agg_singular_type_key_/sql.yql"
+ }
+ ],
+ "test_sql2yql.test[blocks-agg_singular_type_key_optional]": [
+ {
+ "checksum": "a3f91d7949791561f4972eafc1610499",
+ "size": 7678,
+ "uri": "https://{canondata_backend}/1781765/9e1dc7f8aa95db55a59c09f397a0634224d08363/resource.tar.gz#test_sql2yql.test_blocks-agg_singular_type_key_optional_/sql.yql"
+ }
+ ],
+ "test_sql2yql.test[blocks-agg_singular_type_value]": [
+ {
+ "checksum": "cd58c3714a9d215fd1f4bea4e36f37a2",
+ "size": 3634,
+ "uri": "https://{canondata_backend}/1781765/9e1dc7f8aa95db55a59c09f397a0634224d08363/resource.tar.gz#test_sql2yql.test_blocks-agg_singular_type_value_/sql.yql"
+ }
+ ],
+ "test_sql2yql.test[blocks-agg_singular_type_value_optional]": [
+ {
+ "checksum": "5ae70db766241594bfea9edd5e5dec34",
+ "size": 3676,
+ "uri": "https://{canondata_backend}/1781765/0dce37dc71c65fe553d73ed7cf98a62bdee9ddee/resource.tar.gz#test_sql2yql.test_blocks-agg_singular_type_value_optional_/sql.yql"
+ }
+ ],
"test_sql2yql.test[blocks-and]": [
{
"checksum": "e22a52b51ef20174c3b832acb09df01b",
@@ -1410,7 +1438,7 @@
{
"checksum": "601f02d489707b615a9ff16a4fe1d3f5",
"size": 1304,
- "uri": "https://{canondata_backend}/1900335/c447765ddbde200b8fe3ee8091f4d625b36b6bc6/resource.tar.gz#test_sql2yql.test_blocks-as_tuple_/sql.yql"
+ "uri": "https://{canondata_backend}/1784826/bb2033aff3202d2b68e04361e6d1bacbf4cbbed6/resource.tar.gz#test_sql2yql.test_blocks-as_tuple_/sql.yql"
}
],
"test_sql2yql.test[blocks-coalesce]": [
@@ -8299,6 +8327,26 @@
"uri": "file://test_sql_format.test_blocks-agg_by_key_only_distinct_/formatted.sql"
}
],
+ "test_sql_format.test[blocks-agg_singular_type_key]": [
+ {
+ "uri": "file://test_sql_format.test_blocks-agg_singular_type_key_/formatted.sql"
+ }
+ ],
+ "test_sql_format.test[blocks-agg_singular_type_key_optional]": [
+ {
+ "uri": "file://test_sql_format.test_blocks-agg_singular_type_key_optional_/formatted.sql"
+ }
+ ],
+ "test_sql_format.test[blocks-agg_singular_type_value]": [
+ {
+ "uri": "file://test_sql_format.test_blocks-agg_singular_type_value_/formatted.sql"
+ }
+ ],
+ "test_sql_format.test[blocks-agg_singular_type_value_optional]": [
+ {
+ "uri": "file://test_sql_format.test_blocks-agg_singular_type_value_optional_/formatted.sql"
+ }
+ ],
"test_sql_format.test[blocks-and]": [
{
"uri": "file://test_sql_format.test_blocks-and_/formatted.sql"
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_/formatted.sql
new file mode 100644
index 0000000000..fd6c96da8f
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_/formatted.sql
@@ -0,0 +1,72 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+
+$data = ListMap(
+ ListFromRange(1, $n), ($x) -> (
+ <|
+ idx: $x,
+ empty_list: [],
+ empty_dict: {},
+ nil: NULL,
+ val: $x + 5,
+ vid: Void(),
+ emtpy_tuple: AsTuple(),
+ empty_struct: AsStruct()
+ |>
+ )
+);
+
+SELECT
+ empty_list,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_list
+;
+
+SELECT
+ empty_dict,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_dict
+;
+
+SELECT
+ nil,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ nil
+;
+
+SELECT
+ vid,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ vid
+;
+
+SELECT
+ emtpy_tuple,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ emtpy_tuple
+;
+
+SELECT
+ empty_struct,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_struct
+;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_optional_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_optional_/formatted.sql
new file mode 100644
index 0000000000..401f8117f5
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_key_optional_/formatted.sql
@@ -0,0 +1,72 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+
+$data = ListMap(
+ ListFromRange(1, $n), ($x) -> (
+ <|
+ idx: $x,
+ empty_list: Just([]),
+ empty_dict: Just({}),
+ nil: Just(NULL),
+ val: $x + 5,
+ vid: Just(Void()),
+ emtpy_tuple: Just(AsTuple()),
+ empty_struct: Just(AsStruct())
+ |>
+ )
+);
+
+SELECT
+ empty_list,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_list
+;
+
+SELECT
+ empty_dict,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_dict
+;
+
+SELECT
+ nil,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ nil
+;
+
+SELECT
+ vid,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ vid
+;
+
+SELECT
+ emtpy_tuple,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ emtpy_tuple
+;
+
+SELECT
+ empty_struct,
+ SOME(idx)
+FROM
+ as_table($data)
+GROUP BY
+ empty_struct
+;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_/formatted.sql
new file mode 100644
index 0000000000..f8836f06f6
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_/formatted.sql
@@ -0,0 +1,33 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+
+$data = ListMap(
+ ListFromRange(1, $n), ($x) -> (
+ <|
+ idx: $x,
+ empty_list: [],
+ empty_dict: {},
+ nil: NULL,
+ val: $x + 5,
+ vid: Void(),
+ emtpy_tuple: AsTuple(),
+ empty_struct: AsStruct()
+ |>
+ )
+);
+
+SELECT
+ idx,
+ SOME(empty_dict),
+ SOME(empty_list),
+ SOME(nil),
+ SOME(empty_dict),
+ SOME(vid),
+ SOME(emtpy_tuple),
+ SOME(empty_struct),
+FROM
+ as_table($data)
+GROUP BY
+ idx
+;
diff --git a/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_optional_/formatted.sql b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_optional_/formatted.sql
new file mode 100644
index 0000000000..258de29fb0
--- /dev/null
+++ b/yql/essentials/tests/sql/sql2yql/canondata/test_sql_format.test_blocks-agg_singular_type_value_optional_/formatted.sql
@@ -0,0 +1,33 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+
+$data = ListMap(
+ ListFromRange(1, $n), ($x) -> (
+ <|
+ idx: $x,
+ empty_list: Just([]),
+ empty_dict: Just({}),
+ nil: Just(NULL),
+ val: $x + 5,
+ vid: Just(Void()),
+ emtpy_tuple: Just(AsTuple()),
+ empty_struct: Just(AsStruct())
+ |>
+ )
+);
+
+SELECT
+ idx,
+ SOME(empty_dict),
+ SOME(empty_list),
+ SOME(nil),
+ SOME(empty_dict),
+ SOME(vid),
+ SOME(emtpy_tuple),
+ SOME(empty_struct),
+FROM
+ as_table($data)
+GROUP BY
+ idx
+;
diff --git a/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key.sql b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key.sql
new file mode 100644
index 0000000000..34c1f31962
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key.sql
@@ -0,0 +1,23 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+$data = ListMap(ListFromRange(1, $n), ($x) -> (<|idx: $x,
+ empty_list: [],
+ empty_dict: {},
+ nil: NULL,
+ val: $x + 5,
+ vid: Void(),
+ emtpy_tuple: AsTuple(),
+ empty_struct: AsStruct()|>));
+
+SELECT empty_list, SOME(idx) FROM as_table($data) GROUP BY empty_list;
+
+SELECT empty_dict, SOME(idx) FROM as_table($data) GROUP BY empty_dict;
+
+SELECT nil, SOME(idx) FROM as_table($data) GROUP BY nil;
+
+SELECT vid, SOME(idx) FROM as_table($data) GROUP BY vid;
+
+SELECT emtpy_tuple, SOME(idx) FROM as_table($data) GROUP BY emtpy_tuple;
+
+SELECT empty_struct, SOME(idx) FROM as_table($data) GROUP BY empty_struct;
diff --git a/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key_optional.sql b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key_optional.sql
new file mode 100644
index 0000000000..0b86e48184
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_key_optional.sql
@@ -0,0 +1,24 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+
+$data = ListMap(ListFromRange(1, $n), ($x) -> (<|idx: $x,
+ empty_list: Just([]),
+ empty_dict: Just({}),
+ nil: Just(NULL),
+ val: $x + 5,
+ vid: Just(Void()),
+ emtpy_tuple: Just(AsTuple()),
+ empty_struct: Just(AsStruct())|>));
+
+SELECT empty_list, SOME(idx) FROM as_table($data) GROUP BY empty_list;
+
+SELECT empty_dict, SOME(idx) FROM as_table($data) GROUP BY empty_dict;
+
+SELECT nil, SOME(idx) FROM as_table($data) GROUP BY nil;
+
+SELECT vid, SOME(idx) FROM as_table($data) GROUP BY vid;
+
+SELECT emtpy_tuple, SOME(idx) FROM as_table($data) GROUP BY emtpy_tuple;
+
+SELECT empty_struct, SOME(idx) FROM as_table($data) GROUP BY empty_struct;
diff --git a/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value.sql b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value.sql
new file mode 100644
index 0000000000..ac290e2d25
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value.sql
@@ -0,0 +1,26 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+$data = ListMap(ListFromRange(1, $n), ($x) -> (<|idx: $x,
+ empty_list: [],
+ empty_dict: {},
+ nil: NULL,
+ val: $x + 5,
+ vid: Void(),
+ emtpy_tuple: AsTuple(),
+ empty_struct: AsStruct()|>));
+
+SELECT
+ idx,
+ SOME(empty_dict),
+ SOME(empty_list),
+ SOME(nil),
+ SOME(empty_dict),
+ SOME(vid),
+ SOME(emtpy_tuple),
+ SOME(empty_struct),
+FROM
+ as_table($data)
+GROUP BY
+ idx
+;
diff --git a/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value_optional.sql b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value_optional.sql
new file mode 100644
index 0000000000..3214db1049
--- /dev/null
+++ b/yql/essentials/tests/sql/suites/blocks/agg_singular_type_value_optional.sql
@@ -0,0 +1,27 @@
+PRAGMA config.flags('PeepholeFlags', 'UseAggPhases');
+
+$n = 3;
+$data = ListMap(ListFromRange(1, $n), ($x) -> (<|idx: $x,
+ empty_list: Just([]),
+ empty_dict: Just({}),
+ nil: Just(NULL),
+ val: $x + 5,
+ vid: Just(Void()),
+ emtpy_tuple: Just(AsTuple()),
+ empty_struct: Just(AsStruct())|>));
+
+SELECT
+ idx,
+ SOME(empty_dict),
+ SOME(empty_list),
+ SOME(nil),
+ SOME(empty_dict),
+ SOME(vid),
+ SOME(emtpy_tuple),
+ SOME(empty_struct),
+FROM
+ as_table($data)
+GROUP BY
+ idx
+;
+
diff --git a/yql/essentials/types/binary_json/ut/entry_ut.cpp b/yql/essentials/types/binary_json/ut/entry_ut.cpp
index 0f099ed59d..247fc5034c 100644
--- a/yql/essentials/types/binary_json/ut/entry_ut.cpp
+++ b/yql/essentials/types/binary_json/ut/entry_ut.cpp
@@ -17,6 +17,7 @@ public:
UNIT_TEST(TestGetContainer);
UNIT_TEST(TestGetString);
UNIT_TEST(TestGetNumber);
+ UNIT_TEST(TestOutOfBounds);
UNIT_TEST_SUITE_END();
void TestGetType() {
@@ -93,6 +94,28 @@ public:
UNIT_ASSERT_VALUES_EQUAL(container.GetElement(0).GetNumber(), testCase.second);
}
}
+
+ void TestOutOfBounds() {
+ const TVector<std::pair<TString, double>> testCases = {
+ { "1e100000000", std::numeric_limits<double>::infinity() },
+ { "-1e100000000", -std::numeric_limits<double>::infinity() },
+ { "1.797693135e+308", std::numeric_limits<double>::infinity() },
+ { "-1.797693135e+308", -std::numeric_limits<double>::infinity() },
+ };
+
+ for (const auto& testCase : testCases) {
+ UNIT_ASSERT(std::holds_alternative<TString>(SerializeToBinaryJson(testCase.first)));
+ }
+
+ for (const auto& testCase : testCases) {
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first, true));
+ const auto reader = TBinaryJsonReader::Make(binaryJson);
+ const auto container = reader->GetRootCursor();
+
+ UNIT_ASSERT_VALUES_EQUAL(container.GetElement(0).GetNumber(), testCase.second);
+ }
+ }
};
UNIT_TEST_SUITE_REGISTRATION(TBinaryJsonEntryTest);
+
diff --git a/yql/essentials/types/binary_json/write.cpp b/yql/essentials/types/binary_json/write.cpp
index 28dffea9db..39cd05949d 100644
--- a/yql/essentials/types/binary_json/write.cpp
+++ b/yql/essentials/types/binary_json/write.cpp
@@ -13,6 +13,8 @@
#include <util/generic/set.h>
#include <util/generic/stack.h>
#include <util/generic/vector.h>
+#include <yql/essentials/minikql/dom/node.h>
+#include <yql/essentials/utils/parse_double.h>
#include <cmath>
@@ -415,9 +417,8 @@ private:
*/
class TBinaryJsonCallbacks : public TJsonCallbacks {
public:
- TBinaryJsonCallbacks(bool throwException)
- : TJsonCallbacks(/* throwException */ throwException)
- {
+ TBinaryJsonCallbacks(bool throwException, bool allowInf)
+ : TJsonCallbacks(/* throwException */ throwException), AllowInf(allowInf) {
}
bool OnNull() override {
@@ -445,7 +446,7 @@ public:
}
bool OnDouble(double value) override {
- if (Y_UNLIKELY(std::isinf(value))) {
+ if (Y_UNLIKELY(std::isinf(value) && !AllowInf)) {
if (ThrowException) {
ythrow yexception() << "JSON number is infinite";
} else {
@@ -492,6 +493,7 @@ public:
private:
TJsonIndex Json;
+ bool AllowInf;
};
void DomToJsonIndex(const NUdf::TUnboxedValue& value, TBinaryJsonCallbacks& callbacks) {
@@ -573,8 +575,14 @@ template <typename TOnDemandValue>
switch (value.get_number_type()) {
case simdjson::builtin::number_type::floating_point_number: {
double v;
- RETURN_IF_NOT_SUCCESS(value.get(v));
- callbacks.OnDouble(v);
+ if (const auto& error = value.get(v); Y_UNLIKELY(error != simdjson::SUCCESS)) {
+ if (!NYql::TryDoubleFromString((std::string_view)value.raw_json_token(), v)) {
+ return error;
+ }
+ };
+ if (Y_UNLIKELY(!callbacks.OnDouble(v))) {
+ return simdjson::error_code::NUMBER_ERROR;
+ }
break;
}
case simdjson::builtin::number_type::signed_integer: {
@@ -592,7 +600,9 @@ template <typename TOnDemandValue>
case simdjson::builtin::number_type::big_integer:
double v;
RETURN_IF_NOT_SUCCESS(value.get(v));
- callbacks.OnDouble(v);
+ if (Y_UNLIKELY(!callbacks.OnDouble(v))) {
+ return simdjson::error_code::NUMBER_ERROR;
+ }
break;
}
break;
@@ -600,7 +610,7 @@ template <typename TOnDemandValue>
case simdjson::ondemand::json_type::null: {
auto is_null = value.is_null();
RETURN_IF_NOT_SUCCESS(is_null.error());
- if (Y_UNLIKELY(!is_null.value_unsafe())) {
+ if (Y_UNLIKELY(!is_null.value_unsafe())) {
return simdjson::error_code::N_ATOM_ERROR;
}
callbacks.OnNull();
@@ -644,7 +654,7 @@ template <typename TOnDemandValue>
}
// unused, left for performance comparison
-[[maybe_unused]] [[nodiscard]] simdjson::error_code SimdJsonToJsonIndexImpl(const simdjson::dom::element& value, TBinaryJsonCallbacks& callbacks) {
+[[nodiscard]] [[maybe_unused]] simdjson::error_code SimdJsonToJsonIndexImpl(const simdjson::dom::element& value, TBinaryJsonCallbacks& callbacks) {
#define RETURN_IF_NOT_SUCCESS(status) \
if (Y_UNLIKELY(status != simdjson::SUCCESS)) { \
return status; \
@@ -715,9 +725,9 @@ template <typename TOnDemandValue>
}
}
-std::variant<TBinaryJson, TString> SerializeToBinaryJsonImpl(const TStringBuf json) {
+std::variant<TBinaryJson, TString> SerializeToBinaryJsonImpl(const TStringBuf json, bool allowInf) {
std::variant<TBinaryJson, TString> res;
- TBinaryJsonCallbacks callbacks(/* throwException */ false);
+ TBinaryJsonCallbacks callbacks(/* throwException */ false, allowInf);
const simdjson::padded_string paddedJson(json);
simdjson::ondemand::parser parser;
try {
@@ -739,15 +749,16 @@ std::variant<TBinaryJson, TString> SerializeToBinaryJsonImpl(const TStringBuf js
return res;
}
-std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json) {
- return SerializeToBinaryJsonImpl(json);
+std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json, bool allowInf) {
+ return SerializeToBinaryJsonImpl(json, allowInf);
}
TBinaryJson SerializeToBinaryJson(const NUdf::TUnboxedValue& value) {
- TBinaryJsonCallbacks callbacks(/* throwException */ false);
+ TBinaryJsonCallbacks callbacks(/* throwException */ false, /* allowInf */ false);
DomToJsonIndex(value, callbacks);
TBinaryJsonSerializer serializer(std::move(callbacks).GetResult());
return std::move(serializer).Serialize();
}
}
+
diff --git a/yql/essentials/types/binary_json/write.h b/yql/essentials/types/binary_json/write.h
index c4f8f89d2d..61dea9bfb4 100644
--- a/yql/essentials/types/binary_json/write.h
+++ b/yql/essentials/types/binary_json/write.h
@@ -2,23 +2,24 @@
#include "format.h"
-#include <yql/essentials/minikql/dom/node.h>
-
#include <util/generic/maybe.h>
#include <variant>
+namespace NYql::NUdf {
+class TUnboxedValue;
+};
+
namespace NKikimr::NBinaryJson {
/**
* @brief Translates textual JSON into BinaryJson
*/
-std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json);
+std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json, bool allowInf = false);
/**
* @brief Translates DOM layout from `yql/library/dom` library into BinaryJson
*/
-TBinaryJson SerializeToBinaryJson(const NUdf::TUnboxedValue& value);
-
+TBinaryJson SerializeToBinaryJson(const NYql::NUdf::TUnboxedValue& value);
}
diff --git a/yql/essentials/udfs/language/yql/yql_language_udf.cpp b/yql/essentials/udfs/language/yql/yql_language_udf.cpp
index 356ecfade3..ab1090a108 100644
--- a/yql/essentials/udfs/language/yql/yql_language_udf.cpp
+++ b/yql/essentials/udfs/language/yql/yql_language_udf.cpp
@@ -1,10 +1,13 @@
#include <yql/essentials/public/udf/udf_helpers.h>
+#include <yql/essentials/sql/v1/context.h>
+#include <yql/essentials/sql/v1/sql_translation.h>
#include <yql/essentials/sql/v1/lexer/antlr4/lexer.h>
#include <yql/essentials/sql/v1/lexer/antlr4_ansi/lexer.h>
#include <yql/essentials/sql/v1/proto_parser/proto_parser.h>
#include <yql/essentials/sql/v1/proto_parser/antlr4/proto_parser.h>
#include <yql/essentials/sql/v1/proto_parser/antlr4_ansi/proto_parser.h>
+#include <yql/essentials/parser/proto_ast/gen/v1_proto_split/SQLv1Parser.pb.main.h>
#include <yql/essentials/sql/v1/format/sql_format.h>
#include <yql/essentials/sql/settings/translation_settings.h>
#include <library/cpp/protobuf/util/simple_reflection.h>
@@ -12,18 +15,34 @@
using namespace NYql;
using namespace NKikimr::NUdf;
using namespace NSQLTranslation;
+using namespace NSQLTranslationV1;
+using namespace NSQLv1Generated;
+
+class TRuleFreqTranslation : public TSqlTranslation
+{
+public:
+ TRuleFreqTranslation(TContext& ctx)
+ : TSqlTranslation(ctx, ctx.Settings.Mode)
+ {}
+};
class TRuleFreqVisitor {
public:
- TRuleFreqVisitor() {
+ TRuleFreqVisitor(TContext& ctx)
+ : Translation(ctx)
+ {
}
void Visit(const NProtoBuf::Message& msg) {
const NProtoBuf::Descriptor* descr = msg.GetDescriptor();
- if (descr->name() == "TToken") {
+ if (descr == TToken::GetDescriptor()) {
return;
}
+ if (descr == TRule_use_stmt::GetDescriptor()) {
+ VisitUseStmt(dynamic_cast<const TRule_use_stmt&>(msg));
+ }
+
TStringBuf fullName = descr->full_name();
fullName.SkipPrefix("NSQLv1Generated.");
for (int i = 0; i < descr->field_count(); ++i) {
@@ -51,6 +70,17 @@ public:
}
private:
+ void VisitUseStmt(const TRule_use_stmt& msg) {
+ const auto& cluster = msg.GetRule_cluster_expr2();
+ if (cluster.GetBlock2().Alt_case() == TRule_cluster_expr::TBlock2::kAlt1) {
+ const auto& val = cluster.GetBlock2().GetAlt1().GetRule_pure_column_or_named1();
+ if (val.Alt_case() == TRule_pure_column_or_named::kAltPureColumnOrNamed2) {
+ const auto& id = val.GetAlt_pure_column_or_named2().GetRule_an_id1();
+ Freqs[std::make_pair("USE", Id(id, Translation))] += 1;
+ }
+ }
+ }
+
void VisitAllFields(const NProtoBuf::Message& msg, const NProtoBuf::Descriptor* descr) {
for (int i = 0; i < descr->field_count(); ++i) {
const NProtoBuf::FieldDescriptor* fd = descr->field(i);
@@ -64,6 +94,7 @@ private:
}
THashMap<std::pair<TString, TString>, ui64> Freqs;
+ TRuleFreqTranslation Translation;
};
SIMPLE_UDF(TObfuscate, TOptional<char*>(TAutoMap<char*>)) {
@@ -125,7 +156,8 @@ SIMPLE_UDF(TRuleFreq, TOptional<TRuleFreqResult>(TAutoMap<char*>)) {
return {};
}
- TRuleFreqVisitor visitor;
+ TContext ctx(lexers, parsers, settings, {}, issues, query);
+ TRuleFreqVisitor visitor(ctx);
visitor.Visit(*msg);
auto listBuilder = valueBuilder->NewListBuilder();