aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@yandex-team.com>2024-11-09 03:48:37 +0300
committervvvv <vvvv@yandex-team.com>2024-11-09 03:57:36 +0300
commit095cad8cefcadf556bf8f53ea1ddecc87b6508c3 (patch)
tree662ae613b32b1e2839c5713396e5cd53445de9c1
parent7a852449509797b666bf1ad202836dcd0408b584 (diff)
downloadydb-095cad8cefcadf556bf8f53ea1ddecc87b6508c3.tar.gz
Tune YDB <-> YQL deps
init commit_hash:16572ab4e94aea4f7455c2ccb90b70ea99a412db
-rw-r--r--yql/essentials/core/dq_integration/transform/ya.make13
-rw-r--r--yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp22
-rw-r--r--yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h17
-rw-r--r--yql/essentials/core/dq_integration/ya.make22
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_integration.cpp24
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_integration.h84
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_optimization.cpp1
-rw-r--r--yql/essentials/core/dq_integration/yql_dq_optimization.h86
-rw-r--r--yql/essentials/core/ya.make1
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h65
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp899
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/packing.h424
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp983
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h136
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make41
-rw-r--r--yql/essentials/minikql/comp_nodes/packed_tuple/ya.make28
-rw-r--r--yql/essentials/minikql/comp_nodes/ya.make1
-rw-r--r--yql/essentials/minikql/comp_nodes/ya.make.inc1
-rw-r--r--yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp1
-rw-r--r--yql/essentials/minikql/computation/mkql_spiller_factory.h6
-rw-r--r--yql/essentials/minikql/computation/mock_spiller_factory_ut.h2
-rw-r--r--yql/essentials/minikql/computation/ut/ya.make.inc1
-rw-r--r--yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp7
-rw-r--r--yql/essentials/minikql/jsonpath/ut/test_base.cpp8
-rw-r--r--yql/essentials/minikql/mkql_type_ops.cpp5
-rw-r--r--yql/essentials/providers/common/dq/ya.make12
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp96
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_integration_impl.h37
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp29
-rw-r--r--yql/essentials/providers/common/dq/yql_dq_optimization_impl.h17
-rw-r--r--yql/essentials/providers/common/ya.make1
-rw-r--r--yql/essentials/providers/pg/provider/ya.make4
-rw-r--r--yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp2
-rw-r--r--yql/essentials/providers/pg/provider/yql_pg_provider_impl.h2
-rw-r--r--yql/essentials/types/binary_json/ut/container_ut.cpp12
-rw-r--r--yql/essentials/types/binary_json/ut/entry_ut.cpp8
-rw-r--r--yql/essentials/types/binary_json/ut/identity_ut.cpp4
-rw-r--r--yql/essentials/types/binary_json/ut/valid_ut.cpp2
-rw-r--r--yql/essentials/types/binary_json/write.cpp17
-rw-r--r--yql/essentials/types/binary_json/write.h5
40 files changed, 509 insertions, 2617 deletions
diff --git a/yql/essentials/core/dq_integration/transform/ya.make b/yql/essentials/core/dq_integration/transform/ya.make
new file mode 100644
index 0000000000..192fcf8921
--- /dev/null
+++ b/yql/essentials/core/dq_integration/transform/ya.make
@@ -0,0 +1,13 @@
+LIBRARY()
+
+SRCS(
+ yql_dq_task_transform.cpp
+)
+
+PEERDIR(
+ yql/essentials/minikql
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp
new file mode 100644
index 0000000000..d520ec2792
--- /dev/null
+++ b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp
@@ -0,0 +1,22 @@
+#include <yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h>
+
+namespace NYql {
+
+TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories) {
+ return [factories = std::move(factories)] (const THashMap<TString, TString>& taskParams, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) -> NKikimr::NMiniKQL::TCallableVisitFuncProvider {
+ TVector<NKikimr::NMiniKQL::TCallableVisitFuncProvider> funcProviders;
+ for (auto& factory: factories) {
+ funcProviders.push_back(factory(taskParams, funcRegistry));
+ }
+ return [funcProviders = std::move(funcProviders)] (const NKikimr::NMiniKQL::TInternName& name) -> NKikimr::NMiniKQL::TCallableVisitFunc {
+ for (auto& provider: funcProviders) {
+ if (auto res = provider(name)) {
+ return res;
+ }
+ }
+ return {};
+ };
+ };
+}
+
+} // NYql
diff --git a/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h
new file mode 100644
index 0000000000..bad5401ee4
--- /dev/null
+++ b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <yql/essentials/minikql/mkql_node_visitor.h>
+#include <yql/essentials/minikql/mkql_function_registry.h>
+
+#include <util/generic/hash.h>
+#include <util/generic/string.h>
+
+#include <functional>
+
+namespace NYql {
+
+using TTaskTransformFactory = std::function<NKikimr::NMiniKQL::TCallableVisitFuncProvider(const THashMap<TString, TString>&, const NKikimr::NMiniKQL::IFunctionRegistry*)>;
+
+TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories);
+
+} // namespace NYql
diff --git a/yql/essentials/core/dq_integration/ya.make b/yql/essentials/core/dq_integration/ya.make
new file mode 100644
index 0000000000..9e1ba48828
--- /dev/null
+++ b/yql/essentials/core/dq_integration/ya.make
@@ -0,0 +1,22 @@
+LIBRARY()
+
+SRCS(
+ yql_dq_integration.cpp
+ yql_dq_optimization.cpp
+)
+
+PEERDIR(
+ contrib/libs/protobuf
+ library/cpp/yson
+ yql/essentials/ast
+ yql/essentials/core
+ yql/essentials/core/expr_nodes
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE(
+ transform
+)
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.cpp b/yql/essentials/core/dq_integration/yql_dq_integration.cpp
new file mode 100644
index 0000000000..ff1d0cfb84
--- /dev/null
+++ b/yql/essentials/core/dq_integration/yql_dq_integration.cpp
@@ -0,0 +1,24 @@
+#include "yql_dq_integration.h"
+
+#include <yql/essentials/core/yql_type_annotation.h>
+
+namespace NYql {
+
+std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx) {
+ std::unordered_set<IDqIntegration*> uniqueIntegrations(typesCtx.DataSources.size() + typesCtx.DataSinks.size());
+ for (const auto& provider : typesCtx.DataSources) {
+ if (auto* dqIntegration = provider->GetDqIntegration()) {
+ uniqueIntegrations.emplace(dqIntegration);
+ }
+ }
+
+ for (const auto& provider : typesCtx.DataSinks) {
+ if (auto* dqIntegration = provider->GetDqIntegration()) {
+ uniqueIntegrations.emplace(dqIntegration);
+ }
+ }
+
+ return uniqueIntegrations;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h
new file mode 100644
index 0000000000..4f08726cf2
--- /dev/null
+++ b/yql/essentials/core/dq_integration/yql_dq_integration.h
@@ -0,0 +1,84 @@
+#pragma once
+
+#include <yql/essentials/ast/yql_expr.h>
+#include <yql/essentials/core/yql_data_provider.h>
+#include <yql/essentials/core/yql_statistics.h>
+#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/public/issue/yql_issue.h>
+
+#include <library/cpp/yson/writer.h>
+
+#include <util/generic/string.h>
+#include <util/generic/vector.h>
+#include <util/generic/map.h>
+#include <util/generic/maybe.h>
+
+#include <google/protobuf/any.pb.h>
+
+namespace NJson {
+class TJsonValue;
+} // namespace NJson
+
+namespace NYql {
+
+struct TDqSettings;
+class TTransformationPipeline;
+
+namespace NCommon {
+ class TMkqlCallableCompilerBase;
+}
+
+class TFallbackError: public yexception {
+public:
+ TFallbackError(TIssuePtr issue = {})
+ : Issue_(std::move(issue))
+ {}
+
+ TIssuePtr GetIssue() const {
+ return Issue_;
+ }
+private:
+ TIssuePtr Issue_;
+};
+
+class IDqIntegration {
+public:
+ virtual ~IDqIntegration() {}
+
+ virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
+ TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0;
+ virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0;
+ virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0;
+ virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+ virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0;
+ virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
+
+ // Nothing if callable is not for writing,
+ // false if callable is for writing and there are some errors (they are added to ctx),
+ // true if callable is for writing and no issues occured.
+ virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0;
+
+ virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0;
+ virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0;
+ virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0;
+ virtual bool CanFallback() = 0;
+ virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions, TExprContext& ctx) = 0;
+ virtual void FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0;
+ virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0;
+ virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0;
+ virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0;
+ virtual bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) = 0;
+ virtual void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) = 0;
+
+ // Fill plan operator properties for sources/sinks
+ // Return true if node was handled
+ virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
+ virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0;
+ // Called to configure DQ peephole
+ virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0;
+};
+
+std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx);
+
+} // namespace NYql
diff --git a/yql/essentials/core/dq_integration/yql_dq_optimization.cpp b/yql/essentials/core/dq_integration/yql_dq_optimization.cpp
new file mode 100644
index 0000000000..b7f55b1f36
--- /dev/null
+++ b/yql/essentials/core/dq_integration/yql_dq_optimization.cpp
@@ -0,0 +1 @@
+#include "yql_dq_optimization.h"
diff --git a/yql/essentials/core/dq_integration/yql_dq_optimization.h b/yql/essentials/core/dq_integration/yql_dq_optimization.h
new file mode 100644
index 0000000000..80b39bb76a
--- /dev/null
+++ b/yql/essentials/core/dq_integration/yql_dq_optimization.h
@@ -0,0 +1,86 @@
+#pragma once
+
+#include <yql/essentials/ast/yql_expr.h>
+
+namespace NYql {
+
+class IDqOptimization {
+public:
+ virtual ~IDqOptimization() {}
+
+ /**
+ Rewrite DqReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read, if any optimizations
+ */
+ virtual TExprNode::TPtr RewriteRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ /**
+ Rewrite DqReadWrap's underlying provider specific read callable for lookup
+ Args:
+ * read - provider specific read callable
+ * ctx - expr context
+ Returns of of:
+ * empty TPtr, if lookup read is not supported
+ * DqLookupSourceWrap callable
+ */
+ virtual TExprNode::TPtr RewriteLookupRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ /**
+ Apply new members subset for DqReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * members - expr list of atoms with new members
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read with applyed new members
+ */
+ virtual TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& members, TExprContext& ctx) = 0;
+
+ /**
+ Apply `take` or `skip` setting for DqReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * countBase - `Take`, `Skip` or `Limit` callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read with applyed setting
+ */
+ virtual TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& countBase, TExprContext& ctx) = 0;
+
+ /**
+ Apply `unordered` setting for DqReadWrap's underlying provider specific read callable
+ Args:
+ * read - provider specific read callable
+ * ctx - expr context
+ Returns one of:
+ * empty TPtr on error
+ * original `read`, if no changes
+ * new read with applyed setting
+ */
+ virtual TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& read, TExprContext& ctx) = 0;
+
+ /**
+ Optimize list/stream extend for set of DqReadWrap's underlying provider specific read callable
+ Args:
+ * listOfRead - expr list of provider specific read callables
+ * ordered - `true` for ordered extend (must keep original order of reads); `false`, if reads may be reordred due the optimization
+ * ctx - expr context
+ Returns one of:
+ * empty list on error
+ * original `listOfRead`, if no changes
+ * new optimized list of reads. Returned list length must not be greater than original `listOfRead` length
+ */
+ virtual TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfRead, bool ordered, TExprContext& ctx) = 0;
+};
+
+} // namespace NYql
diff --git a/yql/essentials/core/ya.make b/yql/essentials/core/ya.make
index 556e53a570..2e253709c2 100644
--- a/yql/essentials/core/ya.make
+++ b/yql/essentials/core/ya.make
@@ -98,6 +98,7 @@ END()
RECURSE(
cbo
credentials
+ dq_integration
file_storage
issue
minsketch
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h b/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h
deleted file mode 100644
index a33679e4e5..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#pragma once
-
-#include <contrib/ydb/library/yql/utils/simd/simd.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-namespace NPackedTuple {
-
-
-
-template <typename TTraits>
-inline ui32 CalculateCRC32(const ui8 * data, ui32 size, ui32 hash = 0 ) {
-
- using TSimdI8 = typename TTraits::TSimdI8;
-
- while (size >= 8) {
- hash = TSimdI8::CRC32u64(hash, ReadUnaligned<ui64>(data));
- size -= 8;
- data += 8;
- }
-
- switch(size) {
- case 7:
- hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data));
- data += 4;
- [[fallthrough]];
- case 3:
- hash = TSimdI8::CRC32u16(hash, ReadUnaligned<ui16>(data));
- data += 2;
- [[fallthrough]];
- case 1:
- hash = TSimdI8::CRC32u8(hash, ReadUnaligned<ui8>(data));
- break;
- case 6:
- hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data));
- data += 4;
- [[fallthrough]];
- case 2:
- hash = TSimdI8::CRC32u16(hash, ReadUnaligned<ui16>(data));
- break;
- case 5:
- hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data));
- data += 4;
- hash = TSimdI8::CRC32u8(hash, ReadUnaligned<ui8>(data));
- break;
- case 4:
- hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data));
- break;
- case 0:
- break;
- }
- return hash;
-
-}
-template
-__attribute__((target("avx2")))
-ui32 CalculateCRC32<NSimd::TSimdAVX2Traits>(const ui8 * data, ui32 size, ui32 hash = 0 );
-template
-__attribute__((target("sse4.2")))
-ui32 CalculateCRC32<NSimd::TSimdSSE42Traits>(const ui8 * data, ui32 size, ui32 hash = 0 );
-}
-
-}
-
-}
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp b/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp
deleted file mode 100644
index 25ac2a46c3..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp
+++ /dev/null
@@ -1,899 +0,0 @@
-#include <yql/essentials/minikql/mkql_runtime_version.h>
-#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <chrono>
-#include <vector>
-#include <set>
-#include <random>
-
-#include <util/system/fs.h>
-#include <util/system/compiler.h>
-#include <util/stream/null.h>
-#include <util/system/mem_info.h>
-
-#include <yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h>
-#include <yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h>
-
-#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-namespace NPackedTuple {
-
-using namespace std::chrono_literals;
-
-static volatile bool IsVerbose = false;
-#define CTEST (IsVerbose ? Cerr : Cnull)
-
-namespace {
-
-template <typename TTraits>
-void TestCalculateCRC32_Impl() {
- std::mt19937_64 rng; // fixed-seed (0) prng
- std::vector<ui64> v(1024);
- std::generate(v.begin(), v.end(), rng);
-
- ui64 nanoseconds = 0;
- ui64 totalBytes = 0;
- ui32 hash = 0;
- for (ui32 test = 0; test < 65535; ++test) {
- ui32 bytes = rng() % (sizeof(v[0])*v.size());
-
- std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now();
- hash = CalculateCRC32<TTraits>((const ui8 *) v.data(), bytes, hash);
- std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now();
-
- nanoseconds += std::chrono::duration_cast<std::chrono::nanoseconds>(end01 - begin01).count();
- totalBytes += bytes;
- }
- CTEST << "Hash: " << hash << Endl;
- UNIT_ASSERT_VALUES_EQUAL(hash, 80113928);
- CTEST << "Data Size: " << totalBytes << Endl;
- CTEST << "Time for hash: " << ((nanoseconds + 999)/1000) << "[microseconds]" << Endl;
- CTEST << "Calculating speed: " << totalBytes / ((nanoseconds + 999)/1000) << "MB/sec" << Endl;
-}
-}
-
-Y_UNIT_TEST_SUITE(TestHash) {
-
-Y_UNIT_TEST(TestCalculateCRC32Fallback) {
- TestCalculateCRC32_Impl<NSimd::TSimdFallbackTraits>();
-}
-
-Y_UNIT_TEST(TestCalculateCRC32SSE42) {
- if (NX86::HaveSSE42())
- TestCalculateCRC32_Impl<NSimd::TSimdSSE42Traits>();
- else
- CTEST << "Skipped SSE42 test\n";
-}
-
-Y_UNIT_TEST(TestCalculateCRC32AVX2) {
- if (NX86::HaveAVX2())
- TestCalculateCRC32_Impl<NSimd::TSimdAVX2Traits>();
- else
- CTEST << "Skipped AVX2 test\n";
-}
-
-}
-
-Y_UNIT_TEST_SUITE(TupleLayout) {
-Y_UNIT_TEST(CreateLayout) {
-
- TColumnDesc kc1, kc2, pc1, pc2, pc3;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 8;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 4;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 16;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- pc3.Role = EColumnRole::Payload;
- pc3.DataSize = 8;
-
- std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2, pc3};
-
- auto tl = TTupleLayout::Create(columns);
- UNIT_ASSERT(tl->TotalRowSize == 45);
-}
-
-Y_UNIT_TEST(Pack) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- TColumnDesc kc1, kc2, pc1, pc2;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 8;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 4;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2};
-
- auto tl = TTupleLayout::Create(columns);
- UNIT_ASSERT(tl->TotalRowSize == 29);
-
- const ui64 NTuples1 = 10e6;
-
- const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1;
-
- std::vector<ui64> col1(NTuples1, 0);
- std::vector<ui32> col2(NTuples1, 0);
- std::vector<ui64> col3(NTuples1, 0);
- std::vector<ui32> col4(NTuples1, 0);
-
- std::vector<ui8> res(Tuples1DataBytes + 64, 0);
-
- for (ui32 i = 0; i < NTuples1; ++i) {
- col1[i] = i;
- col2[i] = i;
- col3[i] = i;
- col4[i] = i;
- }
-
- const ui8* cols[4];
-
- cols[0] = (ui8*) col1.data();
- cols[1] = (ui8*) col2.data();
- cols[2] = (ui8*) col3.data();
- cols[3] = (ui8*) col4.data();
-
- std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now();
-
- std::vector<ui8> colValid1((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid2((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid3((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid4((NTuples1 + 7)/8, ~0);
- const ui8 *colsValid[4] = {
- colValid1.data(),
- colValid2.data(),
- colValid3.data(),
- colValid4.data(),
- };
-
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
- tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1);
- std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now();
- ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count();
- if (microseconds == 0) microseconds = 1;
-
- CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl;
- CTEST << "Data size = " << Tuples1DataBytes / (1024 * 1024) << "[MB]" << Endl;
- CTEST << "Calculating speed = " << Tuples1DataBytes / microseconds << "MB/sec" << Endl;
- CTEST << Endl;
-
- UNIT_ASSERT(true);
-
-}
-
-Y_UNIT_TEST(Unpack) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- TColumnDesc kc1, kc2, pc1, pc2;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 8;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 4;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2};
-
- auto tl = TTupleLayout::Create(columns);
- UNIT_ASSERT(tl->TotalRowSize == 29);
-
- const ui64 NTuples1 = 10e6;
-
- const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1;
-
- std::vector<ui64> col1(NTuples1, 0);
- std::vector<ui32> col2(NTuples1, 0);
- std::vector<ui64> col3(NTuples1, 0);
- std::vector<ui32> col4(NTuples1, 0);
-
- std::vector<ui8> res(Tuples1DataBytes + 64, 0);
-
- for (ui32 i = 0; i < NTuples1; ++i) {
- col1[i] = i;
- col2[i] = i;
- col3[i] = i;
- col4[i] = i;
- }
-
- const ui8* cols[4];
-
- cols[0] = (ui8*) col1.data();
- cols[1] = (ui8*) col2.data();
- cols[2] = (ui8*) col3.data();
- cols[3] = (ui8*) col4.data();
-
- std::vector<ui8> colValid1((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid2((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid3((NTuples1 + 7)/8, ~0);
- std::vector<ui8> colValid4((NTuples1 + 7)/8, ~0);
- const ui8 *colsValid[4] = {
- colValid1.data(),
- colValid2.data(),
- colValid3.data(),
- colValid4.data(),
- };
-
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
- tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1);
-
- std::vector<ui64> col1_new(NTuples1, 0);
- std::vector<ui32> col2_new(NTuples1, 0);
- std::vector<ui64> col3_new(NTuples1, 0);
- std::vector<ui32> col4_new(NTuples1, 0);
-
- ui8* cols_new[4];
- cols_new[0] = (ui8*) col1_new.data();
- cols_new[1] = (ui8*) col2_new.data();
- cols_new[2] = (ui8*) col3_new.data();
- cols_new[3] = (ui8*) col4_new.data();
-
- std::vector<ui8> colValid1_new((NTuples1 + 7)/8, 0);
- std::vector<ui8> colValid2_new((NTuples1 + 7)/8, 0);
- std::vector<ui8> colValid3_new((NTuples1 + 7)/8, 0);
- std::vector<ui8> colValid4_new((NTuples1 + 7)/8, 0);
-
- ui8 *colsValid_new[4] = {
- colValid1_new.data(),
- colValid2_new.data(),
- colValid3_new.data(),
- colValid4_new.data(),
- };
-
- std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now();
- tl->Unpack(cols_new, colsValid_new, res.data(), overflow, 0, NTuples1);
- std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now();
- ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count();
-
- if (microseconds == 0) microseconds = 1;
-
- CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl;
- CTEST << "Data size = " << Tuples1DataBytes / (1024 * 1024) << "[MB]" << Endl;
- CTEST << "Calculating speed = " << Tuples1DataBytes / microseconds << "MB/sec" << Endl;
- CTEST << Endl;
-
- UNIT_ASSERT(std::memcmp(col1.data(), col1_new.data(), sizeof(ui64) * col1.size()) == 0);
- UNIT_ASSERT(std::memcmp(col2.data(), col2_new.data(), sizeof(ui32) * col2.size()) == 0);
- UNIT_ASSERT(std::memcmp(col3.data(), col3_new.data(), sizeof(ui64) * col3.size()) == 0);
- UNIT_ASSERT(std::memcmp(col4.data(), col4_new.data(), sizeof(ui32) * col4.size()) == 0);
-
- UNIT_ASSERT(std::memcmp(colValid1.data(), colValid1_new.data(), colValid1.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid2.data(), colValid2_new.data(), colValid2.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid3.data(), colValid3_new.data(), colValid3.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid4.data(), colValid4_new.data(), colValid4.size()) == 0);
-}
-
-Y_UNIT_TEST(PackVarSize) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- TColumnDesc kc1, kcv1, kcv2, kc2, pc1, pc2;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 8;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 4;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- kcv1.Role = EColumnRole::Key;
- kcv1.DataSize = 8;
- kcv1.SizeType = EColumnSizeType::Variable;
-
- kcv2.Role = EColumnRole::Key;
- kcv2.DataSize = 16;
- kcv2.SizeType = EColumnSizeType::Variable;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- std::vector<TColumnDesc> columns{kc1, kc2, kcv1, kcv2, pc1, pc2};
-
- auto tl = TTupleLayout::Create(columns);
- CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl;
- UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 54);
-
- const ui64 NTuples1 = 3;
-
- const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1;
-
- std::vector<ui64> col1(NTuples1, 0);
- std::vector<ui32> col2(NTuples1, 0);
- std::vector<ui64> col3(NTuples1, 0);
- std::vector<ui32> col4(NTuples1, 0);
-
- std::vector<ui32> vcol1(1, 0);
-
- std::vector<ui8> vcol1data;
- std::vector<ui32> vcol2(1, 0);
- std::vector<ui8> vcol2data;
-
- std::vector<ui8> res(Tuples1DataBytes + 64, 0);
- std::vector<TString> vcol1str {
- "abc",
- "ABCDEFGHIJKLMNO",
- "ZYXWVUTSPR"
- };
- std::vector<TString> vcol2str {
- "ABC",
- "abcdefghijklmno",
- "zyxwvutspr"
- };
- for (auto &&str: vcol1str) {
- for (auto c: str)
- vcol1data.push_back(c);
- vcol1.push_back(vcol1data.size());
- }
- UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1);
- for (auto &&str: vcol2str) {
- for (auto c: str)
- vcol2data.push_back(c);
- vcol2.push_back(vcol2data.size());
- }
- UNIT_ASSERT_VALUES_EQUAL(vcol2.size(), NTuples1 + 1);
- for (ui32 i = 0; i < NTuples1; ++i) {
- col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1;
- col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1;
- col3[i] = (3ull<<(sizeof(col3[0])*8 - 4)) + i + 1;
- col4[i] = (4ull<<(sizeof(col4[0])*8 - 4)) + i + 1;
- }
-
- const ui8* cols[4 + 2*2];
-
- cols[0] = (ui8*) col1.data();
- cols[1] = (ui8*) col2.data();
- cols[2] = (ui8*) vcol1.data();
- cols[3] = (ui8*) vcol1data.data();
- cols[4] = (ui8*) vcol2.data();
- cols[5] = (ui8*) vcol2data.data();
- cols[6] = (ui8*) col3.data();
- cols[7] = (ui8*) col4.data();
-
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
- std::vector<ui8> colValid((NTuples1 + 7)/8, ~0);
- const ui8 *colsValid[8] = {
- colValid.data(),
- colValid.data(),
- colValid.data(),
- nullptr,
- colValid.data(),
- nullptr,
- colValid.data(),
- colValid.data(),
- };
-
- std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now();
- tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1);
- std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now();
- ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count();
-
- if (microseconds == 0)
- microseconds = 1;
-
- CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl;
-#ifndef NDEBUG
- CTEST << "Result size = " << Tuples1DataBytes << Endl;
- CTEST << "Result = ";
- for (ui32 i = 0; i < Tuples1DataBytes; ++i)
- CTEST << int(res[i]) << ' ';
- CTEST << Endl;
- CTEST << "Overflow size = " << overflow.size() << Endl;
- CTEST << "Overflow = ";
- for (auto c: overflow)
- CTEST << int(c) << ' ';
- CTEST << Endl;
-#endif
- static const ui8 expected_data[54*3] = {
- // row1
-
- 0xe2,0x47,0x16,0x6c, // hash
- 0x1, 0, 0, 0x20, // col1
- 0x1, 0, 0, 0, 0, 0, 0, 0x10, // col2
- 0x3, 0x61, 0x62, 0x63, 0, 0, 0, 0, 0, // vcol1
- 0x3, 0x41, 0x42, 0x43, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // vcol2
- 0x3f, //NULL bitmap
- 0x1, 0, 0, 0x40, // col3
- 0x1, 0, 0, 0, 0, 0, 0, 0x30, // col4
- // row2
- 0xc2, 0x1c, 0x1b, 0xa8, // hash
- 0x2, 0, 0, 0x20, // col1
- 0x2, 0, 0, 0, 0, 0, 0, 0x10, // col2
- 0xff, 0, 0, 0, 0, 0xf, 0, 0, 0, // vcol1 [overflow offset, overflow size]
- 0xf, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, 0x6e, 0x6f, // vcol2
- 0x3f, // NULL bitmap
- 0x2, 0, 0, 0x40, // col3
- 0x2, 0, 0, 0, 0, 0, 0, 0x30, // col4
- // row3
- 0xfa, 0x49, 0x5, 0xe9, // hash
- 0x3, 0, 0, 0x20, // col1
- 0x3, 0, 0, 0, 0, 0, 0, 0x10, // col2
- 0xff, 0xf, 0, 0, 0, 0xa, 0, 0, 0, // vcol1 [overflow offset, overflow size]
- 0xa, 0x7a, 0x79, 0x78, 0x77, 0x76, 0x75, 0x74, 0x73, 0x70, 0x72, 0, 0, 0, 0, 0, // vcol2
- 0x3f, // NULL bitmap
- 0x3, 0, 0, 0x40, // col3
- 0x3, 0, 0, 0, 0, 0, 0, 0x30, // col4
- };
- static const ui8 expected_overflow[25] = {
- 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a, 0x4b, 0x4c, 0x4d, 0x4e, 0x4f,
- 0x5a, 0x59, 0x58, 0x57, 0x56, 0x55, 0x54, 0x53, 0x50, 0x52,
- };
- UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1);
- UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow));
- for (ui32 i = 0; i < sizeof(expected_data); ++i)
- UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]);
- for (ui32 i = 0; i < sizeof(expected_overflow); ++i)
- UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]);
-}
-
-Y_UNIT_TEST(UnpackVarSize) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- TColumnDesc kc1, kcv1, kcv2, kc2, pc1, pc2;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 8;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 4;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- kcv1.Role = EColumnRole::Key;
- kcv1.DataSize = 8;
- kcv1.SizeType = EColumnSizeType::Variable;
-
- kcv2.Role = EColumnRole::Key;
- kcv2.DataSize = 16;
- kcv2.SizeType = EColumnSizeType::Variable;
-
- pc1.Role = EColumnRole::Payload;
- pc1.DataSize = 8;
-
- pc2.Role = EColumnRole::Payload;
- pc2.DataSize = 4;
-
- std::vector<TColumnDesc> columns{kc1, kc2, kcv1, kcv2, pc1, pc2};
-
- auto tl = TTupleLayout::Create(columns);
- CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl;
- UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 54);
-
- const ui64 NTuples1 = 3;
-
- const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1;
-
- std::vector<ui64> col1(NTuples1, 0);
- std::vector<ui32> col2(NTuples1, 0);
- std::vector<ui64> col3(NTuples1, 0);
- std::vector<ui32> col4(NTuples1, 0);
-
- std::vector<ui32> vcol1(1, 0);
- std::vector<ui8> vcol1data;
- std::vector<ui32> vcol2(1, 0);
- std::vector<ui8> vcol2data;
-
- std::vector<ui8> res(Tuples1DataBytes + 64, 0);
- std::vector<TString> vcol1str {
- "abc",
- "ABCDEFGHIJKLMNO",
- "ZYXWVUTSPR"
- };
- std::vector<TString> vcol2str {
- "ABC",
- "abcdefghijklmno",
- "zyxwvutspr"
- };
- for (auto &&str: vcol1str) {
- for (auto c: str)
- vcol1data.push_back(c);
- vcol1.push_back(vcol1data.size());
- }
- UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1);
- for (auto &&str: vcol2str) {
- for (auto c: str)
- vcol2data.push_back(c);
- vcol2.push_back(vcol2data.size());
- }
- UNIT_ASSERT_VALUES_EQUAL(vcol2.size(), NTuples1 + 1);
- for (ui32 i = 0; i < NTuples1; ++i) {
- col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1;
- col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1;
- col3[i] = (3ull<<(sizeof(col3[0])*8 - 4)) + i + 1;
- col4[i] = (4ull<<(sizeof(col4[0])*8 - 4)) + i + 1;
- }
-
- const ui8* cols[4 + 2*2];
-
- cols[0] = (ui8*) col1.data();
- cols[1] = (ui8*) col2.data();
- cols[2] = (ui8*) vcol1.data();
- cols[3] = (ui8*) vcol1data.data();
- cols[4] = (ui8*) vcol2.data();
- cols[5] = (ui8*) vcol2data.data();
- cols[6] = (ui8*) col3.data();
- cols[7] = (ui8*) col4.data();
-
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
- std::vector<ui8> colValid((NTuples1 + 7)/8, ~0);
- const ui8 *colsValid[8] = {
- colValid.data(),
- colValid.data(),
- colValid.data(),
- nullptr,
- colValid.data(),
- nullptr,
- colValid.data(),
- colValid.data(),
- };
-
- tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1);
-
- std::vector<ui64> col1_new(NTuples1, 0);
- std::vector<ui32> col2_new(NTuples1, 0);
- std::vector<ui64> col3_new(NTuples1, 0);
- std::vector<ui32> col4_new(NTuples1, 0);
-
- std::vector<ui32> vcol1_new(NTuples1 + 1, 0);
- std::vector<ui8> vcol1data_new(vcol1data.size());
- std::vector<ui32> vcol2_new(NTuples1 + 1, 0);
- std::vector<ui8> vcol2data_new(vcol2data.size());
-
- ui8* cols_new[4 + 2 * 2];
- cols_new[0] = (ui8*) col1_new.data();
- cols_new[1] = (ui8*) col2_new.data();
- cols_new[2] = (ui8*) vcol1_new.data();
- cols_new[3] = (ui8*) vcol1data_new.data();
- cols_new[4] = (ui8*) vcol2_new.data();
- cols_new[5] = (ui8*) vcol2data_new.data();
- cols_new[6] = (ui8*) col3_new.data();
- cols_new[7] = (ui8*) col4_new.data();
-
- std::vector<ui8> colValid1_new((NTuples1 + 7)/8, 0);
- colValid1_new.back() = ~0;
- std::vector<ui8> colValid2_new((NTuples1 + 7)/8, 0);
- colValid2_new.back() = ~0;
- std::vector<ui8> colValid3_new((NTuples1 + 7)/8, 0);
- colValid3_new.back() = ~0;
- std::vector<ui8> colValid4_new((NTuples1 + 7)/8, 0);
- colValid4_new.back() = ~0;
- std::vector<ui8> colValid5_new((NTuples1 + 7)/8, 0);
- colValid5_new.back() = ~0;
- std::vector<ui8> colValid6_new((NTuples1 + 7)/8, 0);
- colValid6_new.back() = ~0;
-
- ui8 *colsValid_new[8] = {
- colValid1_new.data(),
- colValid2_new.data(),
- colValid3_new.data(),
- nullptr,
- colValid4_new.data(),
- nullptr,
- colValid5_new.data(),
- colValid6_new.data(),
- };
-
- std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now();
- tl->Unpack(cols_new, colsValid_new, res.data(), overflow, 0, NTuples1);
- std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now();
- ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count();
-
- if (microseconds == 0)
- microseconds = 1;
-
- CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl;
-#ifndef NDEBUG
- CTEST << "Result size = " << Tuples1DataBytes << Endl;
- CTEST << "Result = ";
- for (ui32 i = 0; i < Tuples1DataBytes; ++i)
- CTEST << int(res[i]) << ' ';
- CTEST << Endl;
- CTEST << "Overflow size = " << overflow.size() << Endl;
- CTEST << "Overflow = ";
- for (auto c: overflow)
- CTEST << int(c) << ' ';
- CTEST << Endl;
-#endif
-
- UNIT_ASSERT(std::memcmp(cols[0], cols_new[0], sizeof(ui64) * col1.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[1], cols_new[1], sizeof(ui32) * col2.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[2], cols_new[2], sizeof(ui32) * vcol1.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[3], cols_new[3], vcol1data.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[4], cols_new[4], sizeof(ui32) * vcol2.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[5], cols_new[5], vcol1data.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[6], cols_new[6], sizeof(ui64) * col3.size()) == 0);
- UNIT_ASSERT(std::memcmp(cols[7], cols_new[7], sizeof(ui32) * col4.size()) == 0);
-
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid1_new.data(), colValid.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid2_new.data(), colValid.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid3_new.data(), colValid.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid4_new.data(), colValid.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid5_new.data(), colValid.size()) == 0);
- UNIT_ASSERT(std::memcmp(colValid.data(), colValid6_new.data(), colValid.size()) == 0);
-}
-
-Y_UNIT_TEST(PackVarSizeBig) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- TColumnDesc kc1, kc2, kcv1;
-
- kc1.Role = EColumnRole::Key;
- kc1.DataSize = 1;
-
- kc2.Role = EColumnRole::Key;
- kc2.DataSize = 2;
-
- kcv1.Role = EColumnRole::Key;
- kcv1.DataSize = 1000;
- kcv1.SizeType = EColumnSizeType::Variable;
-
- std::vector<TColumnDesc> columns{kc1, kc2, kcv1 };
-
- auto tl = TTupleLayout::Create(columns);
- //CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl;
- UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 263);
-
- const ui64 NTuples1 = 2;
-
- const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1;
-
- std::vector<ui8> col1(NTuples1, 0);
- std::vector<ui16> col2(NTuples1, 0);
-
- std::vector<ui32> vcol1(1, 0);
-
- std::vector<ui8> vcol1data;
-
- std::vector<ui8> res(Tuples1DataBytes + 64, 0);
- std::vector<TString> vcol1str {
- "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb"
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabcdefghijklnmorstuvwxy",
- "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
- "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb"
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
- "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbrstuv",
- };
- for (auto &&str: vcol1str) {
- for (auto c: str)
- vcol1data.push_back(c);
- vcol1.push_back(vcol1data.size());
- }
- UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1);
- for (ui32 i = 0; i < NTuples1; ++i) {
- col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1;
- col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1;
- }
-
- const ui8* cols[2 + 1*2];
-
- cols[0] = (ui8*) col1.data();
- cols[1] = (ui8*) col2.data();
- cols[2] = (ui8*) vcol1.data();
- cols[3] = (ui8*) vcol1data.data();
-
- std::vector<ui8> colValid((NTuples1 + 7)/8, ~0);
- const ui8 *colsValid[2 + 1*2] = {
- colValid.data(),
- colValid.data(),
- colValid.data(),
- nullptr,
- };
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
-
- std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now();
- tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1);
- std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now();
- ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count();
-
- CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl;
-#ifndef NDEBUG
- CTEST << "Result size = " << Tuples1DataBytes << Endl;
- CTEST << "Result = ";
- for (ui32 i = 0; i < Tuples1DataBytes; ++i)
- CTEST << int(res[i]) << ' ';
- CTEST << Endl;
- CTEST << "Overflow size = " << overflow.size() << Endl;
- CTEST << "Overflow = ";
- for (auto c: overflow)
- CTEST << int(c) << ' ';
- CTEST << Endl;
-#endif
- static const ui8 expected_data[263*2] = {
- // row1
- 0xe1,0x22,0x63,0xf5, // hash
- 0x11, // col1
- 0x1, 0x20, // col2
- 0xff, 0, 0, 0, 0, 0xb, 0, 0, 0, // vcol2 [ overflow offset, overflow size ]
- 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66,
- 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c,
- 0x7, // NULL bitmap
- // row 2
- 0xab,0xa5,0x5f,0xd4, // hash
- 0x12, // col1
- 0x2, 0x20, // col2
- 0xfe, 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61,
- 0x61, 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62,
- 0x62, 0x62, 0x72, 0x73, 0x74, 0x75, 0x76,
- 0x7, // NULLs bitmap
- };
- static const ui8 expected_overflow[11] = {
- 0x6e, 0x6d, 0x6f, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79,
- };
- UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1);
- UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow));
- for (ui32 i = 0; i < sizeof(expected_data); ++i)
- UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]);
- for (ui32 i = 0; i < sizeof(expected_overflow); ++i)
- UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]);
-}
-Y_UNIT_TEST(PackIsValidFuzz) {
-
- TScopedAlloc alloc(__LOCATION__);
-
- std::mt19937 rng; // fixed-seed (0) prng
- std::vector<TColumnDesc> columns;
- std::vector<std::vector<ui8>> colsdata;
- std::vector<const ui8*> colsptr;
- std::vector<std::vector<ui8>> isValidData;
- std::vector<const ui8*> isValidPtr;
-
- ui64 totalNanoseconds = 0;
- ui64 totalSize = 0;
- ui64 totalRows = 0;
- for (ui32 test = 0; test < 10; ++test) {
- ui32 rows = 1 + (rng() % 1000);
- ui32 cols = 1 + (rng() % 100);
- columns.resize(cols);
- colsdata.resize(cols);
- colsptr.resize(cols);
- isValidData.resize(cols);
- isValidPtr.resize(cols);
- ui32 isValidSize = (rows + 7)/8;
- totalRows += rows;
- for (ui32 j = 0; j < cols; ++j) {
- auto &col = columns[j];
- col.Role = (rng() % 10 < 1) ? EColumnRole::Key : EColumnRole::Payload;
- col.DataSize = 1u <<(rng() % 16);
- col.SizeType = EColumnSizeType::Fixed;
- colsdata[j].resize(rows*col.DataSize);
- colsptr[j] = colsdata[j].data();
- isValidData[j].resize(isValidSize);
- isValidPtr[j] = isValidData[j].data();
- std::generate(isValidData[j].begin(), isValidData[j].end(), rng);
- }
- auto tl = TTupleLayout::Create(columns);
- std::vector<ui8> res;
- for (ui32 subtest = 0; subtest < 20; ++subtest) {
- ui32 subRows = 1 + (rows ? rng() % (rows - 1) : 0);
- ui32 off = subRows != rows ? rng() % (rows - subRows) : 0;
- std::vector<ui8, TMKQLAllocator<ui8>> overflow;
- totalSize += subRows*tl->TotalRowSize;
- res.resize(subRows*tl->TotalRowSize);
-
- std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now();
- tl->Pack(colsptr.data(), isValidPtr.data(), res.data(), overflow, off, subRows);
- std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now();
- totalNanoseconds += std::chrono::duration_cast<std::chrono::nanoseconds>(end01 - begin01).count();
-
- UNIT_ASSERT_VALUES_EQUAL(overflow.size(), 0);
- auto resptr = res.data();
- for (ui32 row = 0; row < subRows; ++row, resptr += tl->TotalRowSize) {
- for (ui32 j = 0; j < cols; ++j) {
- auto &col = tl->Columns[j];
- UNIT_ASSERT_VALUES_EQUAL(((resptr[tl->BitmaskOffset + (j / 8)] >> (j % 8)) & 1), ((isValidData[col.OriginalIndex][(off + row) / 8] >> ((off + row) % 8)) & 1));
- }
- }
- }
- }
-
- if (totalNanoseconds == 0) totalNanoseconds = 1;
-
- CTEST << "Time for " << totalRows << " transpose (external cycle)= " << (totalNanoseconds + 999)/1000 << "[microseconds]" << Endl;
- CTEST << "Data size = " << totalSize / (1024 * 1024) << "[MB]" << Endl;
- CTEST << "Calculating speed = " << totalSize / ((totalNanoseconds + 999)/1000) << "MB/sec" << Endl;
- CTEST << Endl;
-}
-}
-
-
-
-}
-} // namespace NMiniKQL
-} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h b/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h
deleted file mode 100644
index 929e11fea4..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h
+++ /dev/null
@@ -1,424 +0,0 @@
-#include <util/system/unaligned_mem.h>
-#include <contrib/ydb/library/yql/utils/simd/simd.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-namespace NPackedTuple {
-
-static void
-PackTupleFallbackRowImpl(const ui8 *const src_cols[], ui8 *const dst_rows,
- const size_t cols, const size_t size,
- const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t start = 0) {
- for (size_t row = 0; row != size; ++row) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- *reinterpret_cast<ui##bits *>(dst_rows + row * tuple_size + \
- offsets[col]) = \
- *reinterpret_cast<const ui##bits *>(src_cols[col] + \
- (start + row) * (bits / 8)); \
- break
-
- MULTY_8x4(CASE);
-
-#undef CASE
-#undef MULTY_8x4
-
- default:
- memcpy(dst_rows + row * tuple_size + offsets[col],
- src_cols[col] + (start + row) * col_sizes[col],
- col_sizes[col]);
- }
- }
- }
-}
-
-static void
-UnpackTupleFallbackRowImpl(const ui8 *const src_rows, ui8 *const dst_cols[],
- const size_t cols, const size_t size,
- const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t start = 0) {
- for (size_t row = 0; row != size; ++row) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- *reinterpret_cast<ui##bits *>(dst_cols[col] + \
- (start + row) * (bits / 8)) = \
- *reinterpret_cast<const ui##bits *>(src_rows + row * tuple_size + \
- offsets[col]); \
- break
-
- MULTY_8x4(CASE);
-
-#undef CASE
-#undef MULTY_8x4
-
- default:
- memcpy(dst_cols[col] + (start + row) * col_sizes[col],
- src_rows + row * tuple_size + offsets[col],
- col_sizes[col]);
- }
- }
- }
-}
-
-template <class ByteType>
-Y_FORCE_INLINE static void
-PackTupleFallbackTypedColImpl(const ui8 *const src_col, ui8 *const dst_rows,
- const size_t size, const size_t tuple_size,
- const size_t start = 0) {
- static constexpr size_t BYTES = sizeof(ByteType);
- for (size_t row = 0; row != size; ++row) {
- WriteUnaligned<ByteType>(
- dst_rows + row * tuple_size,
- ReadUnaligned<ByteType>(src_col + (start + row) * BYTES));
- }
-}
-
-template <class ByteType>
-Y_FORCE_INLINE static void
-UnpackTupleFallbackTypedColImpl(const ui8 *const src_rows, ui8 *const dst_col,
- const size_t size, const size_t tuple_size,
- const size_t start = 0) {
- static constexpr size_t BYTES = sizeof(ByteType);
- for (size_t row = 0; row != size; ++row) {
- WriteUnaligned<ByteType>(
- dst_col + (start + row) * BYTES,
- ReadUnaligned<ByteType>(src_rows + row * tuple_size));
- }
-}
-
-static void
-PackTupleFallbackColImpl(const ui8 *const src_cols[], ui8 *const dst_rows,
- const size_t cols, const size_t size,
- const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t start = 0) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- PackTupleFallbackTypedColImpl<ui##bits>( \
- src_cols[col], dst_rows + offsets[col], size, tuple_size, start); \
- break
-
- MULTY_8x4(CASE);
-
-#undef CASE
-#undef MULTY_8x4
-
- default:
- for (size_t row = 0; row != size; ++row) {
- memcpy(dst_rows + row * tuple_size + offsets[col],
- src_cols[col] + (start + row) * col_sizes[col],
- col_sizes[col]);
- }
- }
- }
-}
-
-static void
-UnpackTupleFallbackColImpl(const ui8 *const src_rows, ui8 *const dst_cols[],
- const size_t cols, const size_t size,
- const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t start = 0) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- UnpackTupleFallbackTypedColImpl<ui##bits>( \
- src_rows + offsets[col], dst_cols[col], size, tuple_size, start); \
- break
-
- MULTY_8x4(CASE);
-
-#undef CASE
-#undef MULTY_8x4
-
- default:
- for (size_t row = 0; row != size; ++row) {
- memcpy(dst_cols[col] + (start + row) * col_sizes[col],
- src_rows + row * tuple_size + offsets[col],
- col_sizes[col]);
- }
- }
- }
-}
-
-[[maybe_unused]] static void PackTupleFallbackBlockImpl(
- const ui8 *const src_cols[], ui8 *const dst_rows, const size_t cols,
- const size_t size, const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t block_rows, const size_t start = 0) {
-
- const size_t block_size = size / block_rows;
- for (size_t block = 0; block != block_size; ++block) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define BLOCK_LOOP(...) \
- for (size_t block_i = 0; block_i != block_rows; ++block_i) { \
- const size_t row = block_rows * block + block_i; \
- __VA_ARGS__ \
- }
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- PackTupleFallbackTypedColImpl<ui##bits>( \
- src_cols[col], \
- dst_rows + block * block_rows * tuple_size + offsets[col], \
- block_rows, tuple_size, start + block * block_rows); \
- break
-
- MULTY_8x4(CASE);
-
- default:
- BLOCK_LOOP(
- memcpy(dst_rows + row * tuple_size + offsets[col],
- src_cols[col] + (start + row) * col_sizes[col],
- col_sizes[col]);)
-
-#undef CASE
-#undef MULTY_8x4
-#undef BLOCK_LOOP
- }
- }
- }
-
- PackTupleFallbackColImpl(
- src_cols, dst_rows + block_size * block_rows * tuple_size, cols,
- size - block_size * block_rows, col_sizes, offsets, tuple_size,
- start + block_size * block_rows);
-}
-
-[[maybe_unused]] static void UnpackTupleFallbackBlockImpl(
- const ui8 *const src_rows, ui8 *const dst_cols[], const size_t cols,
- const size_t size, const size_t col_sizes[], const size_t offsets[],
- const size_t tuple_size, const size_t block_rows, const size_t start = 0) {
-
- const size_t block_size = size / block_rows;
- for (size_t block = 0; block != block_size; ++block) {
- for (ui8 col = 0; col != cols; ++col) {
- switch (col_sizes[col] * 8) {
-
-#define BLOCK_LOOP(...) \
- for (size_t block_i = 0; block_i != block_rows; ++block_i) { \
- const size_t row = block_rows * block + block_i; \
- __VA_ARGS__ \
- }
-
-#define MULTY_8x4(...) \
- __VA_ARGS__(8); \
- __VA_ARGS__(16); \
- __VA_ARGS__(32); \
- __VA_ARGS__(64)
-
-#define CASE(bits) \
- case bits: \
- UnpackTupleFallbackTypedColImpl<ui##bits>( \
- src_rows + block * block_rows * tuple_size + offsets[col], \
- dst_cols[col], block_rows, tuple_size, \
- start + block * block_rows); \
- break
-
- MULTY_8x4(CASE);
-
- default:
- BLOCK_LOOP(
- memcpy(dst_cols[col] + (start + row) * col_sizes[col],
- src_rows + row * tuple_size + offsets[col],
- col_sizes[col]);)
-
-#undef CASE
-#undef MULTY_8x4
-#undef BLOCK_LOOP
- }
- }
- }
-
- UnpackTupleFallbackColImpl(src_rows + block_size * block_rows * tuple_size,
- dst_cols, cols, size - block_size * block_rows,
- col_sizes, offsets, tuple_size,
- start + block_size * block_rows);
-}
-
-template <class TTraits> struct SIMDPack {
- template <class T> using TSimd = typename TTraits::template TSimd8<T>;
-
- static TSimd<ui8> BuildTuplePerm(size_t col_size, size_t col_pad,
- ui8 offset, ui8 ind, bool packing) {
- ui8 perm[TSimd<ui8>::SIZE];
- std::memset(perm, 0x80, TSimd<ui8>::SIZE);
-
- size_t iters = std::max(size_t(1u), TSimd<ui8>::SIZE / (col_size + col_pad));
- while (iters--) {
- for (size_t it = col_size; it; --it, ++offset, ++ind) {
- if (packing) {
- perm[offset] = ind;
- } else {
- perm[ind] = offset;
- }
- }
- offset += col_pad;
- }
-
- return TSimd<ui8>{perm};
- }
-
- template <ui8 TupleSize> static TSimd<ui8> TupleOr(TSimd<ui8> vec[]) {
- return TupleOrImpl<TupleSize>(vec);
- }
-
- template <ui8 TupleSize> static TSimd<ui8> TupleOrImpl(TSimd<ui8> vec[]) {
- static constexpr ui8 Left = TupleSize / 2;
- static constexpr ui8 Right = TupleSize - Left;
-
- return TupleOrImpl<Left>(vec) | TupleOrImpl<Right>(vec + Left);
- }
-
- template <> TSimd<ui8> TupleOrImpl<0>(TSimd<ui8>[]) { std::abort(); }
-
- template <> TSimd<ui8> TupleOrImpl<1>(TSimd<ui8> vec[]) { return vec[0]; }
-
- template <> TSimd<ui8> TupleOrImpl<2>(TSimd<ui8> vec[]) {
- return vec[0] | vec[1];
- }
-
- template <ui8 StoresPerLoad, ui8 Cols>
- static void
- PackTupleOrImpl(const ui8 *const src_cols[], ui8 *const dst_rows,
- const size_t size, const size_t col_sizes[],
- const size_t offsets[], const size_t tuple_size,
- const TSimd<ui8> perms[], const size_t start = 0) {
- static constexpr size_t kSIMD_Rem = sizeof(TSimd<ui8>) - StoresPerLoad;
- const ui8 tuples_per_store =
- std::max(size_t(1u), TSimd<ui8>::SIZE / tuple_size);
- const size_t simd_iters = (size > kSIMD_Rem ? size - kSIMD_Rem : 0) /
- (tuples_per_store * StoresPerLoad);
-
- TSimd<ui8> src_regs[Cols];
- TSimd<ui8> perm_regs[Cols];
-
- const ui8 *srcs[Cols];
- std::memcpy(srcs, src_cols, sizeof(srcs));
- for (ui8 col = 0; col != Cols; ++col) {
- srcs[col] += col_sizes[col] * start;
- }
-
- auto dst = dst_rows;
- ui8 *const end = dst_rows + simd_iters * tuples_per_store *
- StoresPerLoad * tuple_size;
- while (dst != end) {
- for (ui8 col = 0; col != Cols; ++col) {
- src_regs[col] = TSimd<ui8>(srcs[col]);
- srcs[col] += col_sizes[col] * tuples_per_store * StoresPerLoad;
- }
-
- for (ui8 iter = 0; iter != StoresPerLoad; ++iter) {
- // shuffling each col bytes to the right positions
- // then blending them together with 'or'
- for (ui8 col = 0; col != Cols; ++col) {
- perm_regs[col] = src_regs[col].Shuffle(
- perms[col * StoresPerLoad + iter]);
- }
-
- TupleOr<Cols>(perm_regs).Store(dst);
- dst += tuple_size * tuples_per_store;
- }
- }
-
- PackTupleFallbackRowImpl(srcs, dst, Cols,
- size - simd_iters * tuples_per_store *
- StoresPerLoad,
- col_sizes, offsets, tuple_size);
- }
-
- template <ui8 LoadsPerStore, ui8 Cols>
- static void
- UnpackTupleOrImpl(const ui8 *const src_rows, ui8 *const dst_cols[],
- size_t size, const size_t col_sizes[],
- const size_t offsets[], const size_t tuple_size,
- const TSimd<ui8> perms[], const size_t start = 0) {
- static constexpr size_t kSIMD_Rem = sizeof(TSimd<ui8>) - LoadsPerStore;
- const ui8 tuples_per_load =
- std::max(size_t(1u), TSimd<ui8>::SIZE / tuple_size);
- const size_t simd_iters = (size > kSIMD_Rem ? size - kSIMD_Rem : 0) /
- (tuples_per_load * LoadsPerStore);
-
- TSimd<ui8> src_regs[LoadsPerStore];
- TSimd<ui8> perm_regs[LoadsPerStore];
-
- auto src = src_rows;
- const ui8 *const end = src_rows + simd_iters * tuples_per_load *
- LoadsPerStore * tuple_size;
-
- ui8 *dsts[Cols];
- std::memcpy(dsts, dst_cols, sizeof(dsts));
- for (ui8 col = 0; col != Cols; ++col) {
- dsts[col] += col_sizes[col] * start;
- }
-
- while (src != end) {
- for (ui8 iter = 0; iter != LoadsPerStore; ++iter) {
- src_regs[iter] = TSimd<ui8>(src);
- src += tuple_size * tuples_per_load;
- }
-
- for (ui8 col = 0; col != Cols; ++col) {
- // shuffling each col bytes to the right positions
- // then blending them together with 'or'
- for (ui8 iter = 0; iter != LoadsPerStore; ++iter) {
- perm_regs[iter] = src_regs[iter].Shuffle(
- perms[col * LoadsPerStore + iter]);
- }
-
- TupleOr<LoadsPerStore>(perm_regs).Store(dsts[col]);
- dsts[col] += col_sizes[col] * tuples_per_load * LoadsPerStore;
- }
- }
-
- UnpackTupleFallbackRowImpl(src, dsts, Cols,
- size - simd_iters * tuples_per_load *
- LoadsPerStore,
- col_sizes, offsets, tuple_size);
- }
-};
-
-} // namespace NPackedTuple
-} // namespace NMiniKQL
-} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp b/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp
deleted file mode 100644
index 8a51b8c123..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp
+++ /dev/null
@@ -1,983 +0,0 @@
-#include "tuple.h"
-
-#include <algorithm>
-#include <queue>
-
-#include <yql/essentials/minikql/mkql_node.h>
-#include <yql/essentials/public/udf/udf_data_type.h>
-#include <yql/essentials/public/udf/udf_types.h>
-#include <yql/essentials/public/udf/udf_value.h>
-
-#include <util/generic/bitops.h>
-#include <util/generic/buffer.h>
-
-#include "hashes_calc.h"
-#include "packing.h"
-
-namespace NKikimr {
-namespace NMiniKQL {
-namespace NPackedTuple {
-
-namespace {
-
-// Transpose 8x8 bit-matrix packed in ui64 integer
-Y_FORCE_INLINE ui64 transposeBitmatrix(ui64 x) {
- if (x == 0xFFFFFFFFFFFFFFFFLL) {
- return x;
- }
-
- // a b A B aa bb AA BB
- // c d C D cc dd CC DD
- // ->
- // a c A C aa cc AA CC
- // b d B D bb dd BB DD
- // a b A B aa bb AA BB // c d C D cc dd CC DD
- // a c A C aa cc AA CC // b d B D bb dd BB DD
- x = ((x &
- 0b10101010'01010101'10101010'01010101'10101010'01010101'10101010'01010101ull)) |
- ((x &
- 0b01010101'00000000'01010101'00000000'01010101'00000000'01010101'00000000ull) >>
- 7) |
- ((x &
- 0b00000000'10101010'00000000'10101010'00000000'10101010'00000000'10101010ull)
- << 7);
- // a1 a2 b1 b2 A1 A2 B1 B2
- // a3 a4 b3 b4 A3 A4 B3 B4
- // c1 c2 d1 d2 C1 C2 D1 D2
- // c3 c4 d3 d4 C3 C4 D3 D4
- // ->
- // a1 a2 c1 c2 A1 A2 C1 C2
- // a3 a4 c3 c4 A3 A4 C3 C4
- // b1 b2 d1 d2 B1 B2 D1 D2
- // b3 b4 d3 d4 B3 B4 D3 D4
- //
- //
- // a1 a2 b1 b2 A1 A2 B1 B2 // a3 a4 b3 b4 A3 A4 B3 B4 // c1 c2 d1 d2 C1 C2
- // D1 D2 // c3 c4 d3 d4 C3 C4 D3 D4
- // ->
- // a1 a2 c1 c2 A1 A2 C1 C2 // a3 a4 c3 c4 A3 A4 C3 C4 // b1 b2 d1 d2 B1 B2
- // D1 D2 // b3 b4 d3 d4 B3 B4 D3 D4
- x = ((x &
- 0b1100110011001100'0011001100110011'1100110011001100'0011001100110011ull)) |
- ((x &
- 0b0011001100110011'0000000000000000'0011001100110011'0000000000000000ull) >>
- 14) |
- ((x &
- 0b0000000000000000'1100110011001100'0000000000000000'1100110011001100ull)
- << 14);
- x = ((x &
- 0b11110000111100001111000011110000'00001111000011110000111100001111ull)) |
- ((x &
- 0b00001111000011110000111100001111'00000000000000000000000000000000ull) >>
- 28) |
- ((x &
- 0b00000000000000000000000000000000'11110000111100001111000011110000ull)
- << 28);
- return x;
-}
-
-void transposeBitmatrix(ui8 dst[], const ui8 *src[], const size_t row_size) {
- ui64 x = 0;
- for (size_t ind = 0; ind != 8; ++ind) {
- x |= ui64(*src[ind]) << (ind * 8);
- }
-
- x = transposeBitmatrix(x);
-
- for (size_t ind = 0; ind != 8; ++ind) {
- dst[ind * row_size] = x;
- x >>= 8;
- }
-}
-
-void transposeBitmatrix(ui8 *dst[], const ui8 src[], const size_t row_size) {
- ui64 x = 0;
- for (size_t ind = 0; ind != 8; ++ind) {
- x |= ui64(src[ind * row_size]) << (ind * 8);
- }
-
- x = transposeBitmatrix(x);
-
- for (size_t ind = 0; ind != 8; ++ind) {
- *dst[ind] = x;
- x >>= 8;
- }
-}
-
-} // namespace
-
-THolder<TTupleLayout>
-TTupleLayout::Create(const std::vector<TColumnDesc> &columns) {
-
- if (NX86::HaveAVX2())
- return MakeHolder<TTupleLayoutFallback<NSimd::TSimdAVX2Traits>>(
- columns);
-
- if (NX86::HaveSSE42())
- return MakeHolder<TTupleLayoutFallback<NSimd::TSimdSSE42Traits>>(
- columns);
-
- return MakeHolder<TTupleLayoutFallback<NSimd::TSimdFallbackTraits>>(
- columns);
-}
-
-template <typename TTraits>
-TTupleLayoutFallback<TTraits>::TTupleLayoutFallback(
- const std::vector<TColumnDesc> &columns)
- : TTupleLayout(columns) {
-
- for (ui32 i = 0, idx = 0; i < OrigColumns.size(); ++i) {
- auto &col = OrigColumns[i];
-
- col.OriginalIndex = idx;
-
- if (col.SizeType == EColumnSizeType::Variable) {
- // we cannot handle (rare) overflow strings unless we have at least
- // space for header; size of inlined strings is limited to 254
- // bytes, limit maximum inline data size
- col.DataSize = std::max<ui32>(1 + 2 * sizeof(ui32),
- std::min<ui32>(255, col.DataSize));
- idx += 2; // Variable-size takes two columns: one for offsets, and
- // another for payload
- } else {
- idx += 1;
- }
-
- if (col.Role == EColumnRole::Key) {
- KeyColumns.push_back(col);
- } else {
- PayloadColumns.push_back(col);
- }
- }
-
- KeyColumnsNum = KeyColumns.size();
-
- auto ColumnDescLess = [](const TColumnDesc &a, const TColumnDesc &b) {
- if (a.SizeType != b.SizeType) // Fixed first
- return a.SizeType == EColumnSizeType::Fixed;
-
- if (a.DataSize == b.DataSize)
- // relative order of (otherwise) same key columns must be preserved
- return a.OriginalIndex < b.OriginalIndex;
-
- return a.DataSize < b.DataSize;
- };
-
- std::sort(KeyColumns.begin(), KeyColumns.end(), ColumnDescLess);
- std::sort(PayloadColumns.begin(), PayloadColumns.end(), ColumnDescLess);
-
- KeyColumnsFixedEnd = 0;
-
- ui32 currOffset = 4; // crc32 hash in the beginning
- KeyColumnsOffset = currOffset;
- KeyColumnsFixedNum = KeyColumnsNum;
-
- for (ui32 i = 0; i < KeyColumnsNum; ++i) {
- auto &col = KeyColumns[i];
-
- if (col.SizeType == EColumnSizeType::Variable &&
- KeyColumnsFixedEnd == 0) {
- KeyColumnsFixedEnd = currOffset;
- KeyColumnsFixedNum = i;
- }
-
- col.ColumnIndex = i;
- col.Offset = currOffset;
- Columns.push_back(col);
- currOffset += col.DataSize;
- }
-
- KeyColumnsEnd = currOffset;
-
- if (KeyColumnsFixedEnd == 0) // >= 4 if was ever assigned
- KeyColumnsFixedEnd = KeyColumnsEnd;
-
- KeyColumnsSize = KeyColumnsEnd - KeyColumnsOffset;
- BitmaskOffset = currOffset;
-
- BitmaskSize = (OrigColumns.size() + 7) / 8;
-
- currOffset += BitmaskSize;
- BitmaskEnd = currOffset;
-
- PayloadOffset = currOffset;
-
- for (ui32 i = 0; i < PayloadColumns.size(); ++i) {
- auto &col = PayloadColumns[i];
- col.ColumnIndex = KeyColumnsNum + i;
- col.Offset = currOffset;
- Columns.push_back(col);
- currOffset += col.DataSize;
- }
-
- PayloadEnd = currOffset;
- PayloadSize = PayloadEnd - PayloadOffset;
-
- TotalRowSize = currOffset;
-
- for (auto &col : Columns) {
- if (col.SizeType == EColumnSizeType::Variable) {
- VariableColumns_.push_back(col);
- } else if (IsPowerOf2(col.DataSize) &&
- col.DataSize < (1u << FixedPOTColumns_.size())) {
- FixedPOTColumns_[CountTrailingZeroBits(col.DataSize)].push_back(
- col);
- } else {
- FixedNPOTColumns_.push_back(col);
- }
- }
-
- /// TODO: dynamic configuration
- BlockRows_ = 256;
- const bool use_simd = true;
-
- std::vector<const TColumnDesc *> block_fallback;
- std::queue<const TColumnDesc *> next_cols;
-
- size_t fixed_cols_left =
- KeyColumnsFixedNum +
- std::accumulate(PayloadColumns.begin(), PayloadColumns.end(), 0ul,
- [](size_t prev, const auto &col) {
- return prev +
- (col.SizeType == EColumnSizeType::Fixed);
- });
-
- size_t prev_tuple_size;
- size_t curr_tuple_size = 0;
-
- const auto manage_block_packing = [&](const std::vector<TColumnDesc>
- &columns) {
- for (size_t col_ind = 0;
- col_ind != columns.size() &&
- columns[col_ind].SizeType == EColumnSizeType::Fixed;) {
- --fixed_cols_left;
- next_cols.push(&columns[col_ind]);
- prev_tuple_size = curr_tuple_size;
- curr_tuple_size = next_cols.back()->Offset +
- next_cols.back()->DataSize -
- next_cols.front()->Offset;
-
- ++col_ind;
- if (curr_tuple_size >= TSimd<ui8>::SIZE ||
- next_cols.size() == kSIMDMaxCols || !fixed_cols_left) {
- const bool oversize = curr_tuple_size > TSimd<ui8>::SIZE;
- const size_t tuple_size =
- oversize ? prev_tuple_size : curr_tuple_size;
- const size_t tuple_cols = next_cols.size() - oversize;
-
- if (!use_simd || !tuple_cols ||
- (Columns.size() != next_cols.size() &&
- tuple_size < TSimd<ui8>::SIZE * 7 / 8) ||
- tuple_size > TSimd<ui8>::SIZE ||
- (!SIMDBlock_.empty() &&
- TotalRowSize - next_cols.front()->Offset <
- TSimd<ui8>::SIZE)) {
- block_fallback.push_back(next_cols.front());
- next_cols.pop();
- continue;
- }
-
- SIMDDesc simd_desc;
- simd_desc.Cols = tuple_cols;
- simd_desc.PermMaskOffset = SIMDPermMasks_.size();
- simd_desc.RowOffset = next_cols.front()->Offset;
-
- const TColumnDesc *col_descs[kSIMDMaxCols];
- ui32 col_max_size = 0;
- for (ui8 col_ind = 0; col_ind != simd_desc.Cols; ++col_ind) {
- col_descs[col_ind] = next_cols.front();
- col_max_size =
- std::max(col_max_size, col_descs[col_ind]->DataSize);
- next_cols.pop();
- }
-
- simd_desc.InnerLoopIters = std::min(
- size_t(kSIMDMaxInnerLoopSize),
- (TSimd<ui8>::SIZE / col_max_size) /
- std::max(size_t(1u), size_t(TSimd<ui8>::SIZE / TotalRowSize)));
-
- const auto tuples_per_register =
- std::max(1u, TSimd<ui8>::SIZE / TotalRowSize);
-
- for (ui8 col_ind = 0; col_ind != simd_desc.Cols; ++col_ind) {
- const auto &col_desc = col_descs[col_ind];
- const size_t offset =
- col_desc->Offset - simd_desc.RowOffset;
-
- BlockFixedColsSizes_.push_back(col_desc->DataSize);
- BlockColsOffsets_.push_back(offset);
- BlockColumnsOrigInds_.push_back(col_desc->OriginalIndex);
- }
-
- for (size_t packing_flag = 1; packing_flag != 3;
- ++packing_flag) {
- for (ui8 col_ind = 0; col_ind != simd_desc.Cols;
- ++col_ind) {
- const auto &col_desc = col_descs[col_ind];
- const size_t offset =
- col_desc->Offset - simd_desc.RowOffset;
-
- for (ui8 ind = 0; ind != simd_desc.InnerLoopIters;
- ++ind) {
- SIMDPermMasks_.push_back(
- SIMDPack<TTraits>::BuildTuplePerm(
- col_desc->DataSize,
- TotalRowSize - col_desc->DataSize, offset,
- ind * col_desc->DataSize *
- tuples_per_register,
- packing_flag % 2));
- }
- }
- }
-
- SIMDBlock_.push_back(simd_desc);
- }
- }
-
- while (!next_cols.empty()) {
- block_fallback.push_back(next_cols.front());
- next_cols.pop();
- }
- };
-
- manage_block_packing(KeyColumns);
- manage_block_packing(PayloadColumns);
-
- for (const auto col_desc_p : block_fallback) {
- BlockColsOffsets_.push_back(col_desc_p->Offset);
- BlockFixedColsSizes_.push_back(col_desc_p->DataSize);
- BlockColumnsOrigInds_.push_back(col_desc_p->OriginalIndex);
- }
-}
-
-// Columns (SoA) format:
-// for fixed size: packed data
-// for variable size: offset (ui32) into next column; size of colum is
-// rowCount + 1
-//
-// Row (AoS) format:
-// fixed size: packed data
-// variable size:
-// assumes DataSize <= 255 && DataSize >= 1 + 2*4
-// if size of payload is less than col.DataSize:
-// u8 one byte of size (0..254)
-// u8 [size] data
-// u8 [DataSize - 1 - size] padding
-// if size of payload is greater than DataSize:
-// u8 = 255
-// u32 = offset in overflow buffer
-// u32 = size
-// u8 [DataSize - 1 - 2*4] initial bytes of data
-// Data is expected to be consistent with isValidBitmask (0 for fixed-size,
-// empty for variable-size)
-template <>
-void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Pack(
- const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const {
- using TTraits = NSimd::TSimdFallbackTraits;
-
- std::vector<ui64> bitmaskMatrix(BitmaskSize);
-
- if (auto off = (start % 8)) {
- auto bitmaskIdx = start / 8;
-
- for (ui32 j = Columns.size(); j--;)
- bitmaskMatrix[j / 8] |=
- ui64(isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx])
- << ((j % 8) * 8);
-
- for (auto &m : bitmaskMatrix) {
- m = transposeBitmatrix(m);
- m >>= off * 8;
- }
- }
-
- for (; count--; ++start, res += TotalRowSize) {
- ui32 hash = 0;
- auto bitmaskIdx = start / 8;
-
- bool anyOverflow = false;
-
- for (ui32 i = KeyColumnsFixedNum; i < KeyColumns.size(); ++i) {
- auto &col = KeyColumns[i];
- ui32 dataOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * start);
- ui32 nextOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * (start + 1));
- auto size = nextOffset - dataOffset;
-
- if (size >= col.DataSize) {
- anyOverflow = true;
- break;
- }
- }
-
- if ((start % 8) == 0) {
- std::fill(bitmaskMatrix.begin(), bitmaskMatrix.end(), 0);
- for (ui32 j = Columns.size(); j--;)
- bitmaskMatrix[j / 8] |=
- ui64(isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx])
- << ((j % 8) * 8);
- for (auto &m : bitmaskMatrix)
- m = transposeBitmatrix(m);
- }
-
- for (ui32 j = 0; j < BitmaskSize; ++j) {
- res[BitmaskOffset + j] = ui8(bitmaskMatrix[j]);
- bitmaskMatrix[j] >>= 8;
- }
-
- for (auto &col : FixedNPOTColumns_) {
- std::memcpy(res + col.Offset,
- columns[col.OriginalIndex] + start * col.DataSize,
- col.DataSize);
- }
-
-#define PackPOTColumn(POT) \
- for (auto &col : FixedPOTColumns_[POT]) { \
- std::memcpy(res + col.Offset, \
- columns[col.OriginalIndex] + start * (1u << POT), \
- 1u << POT); \
- }
-
- PackPOTColumn(0);
- PackPOTColumn(1);
- PackPOTColumn(2);
- PackPOTColumn(3);
- PackPOTColumn(4);
-#undef PackPOTColumn
-
- for (auto &col : VariableColumns_) {
- auto dataOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * start);
- auto nextOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * (start + 1));
- auto size = nextOffset - dataOffset;
- auto data = columns[col.OriginalIndex + 1] + dataOffset;
-
- if (size >= col.DataSize) {
- res[col.Offset] = 255;
-
- ui32 prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32));
- auto overflowSize = size - prefixSize;
- auto overflowOffset = overflow.size();
-
- overflow.resize(overflowOffset + overflowSize);
-
- WriteUnaligned<ui32>(res + col.Offset + 1 + 0 * sizeof(ui32),
- overflowOffset);
- WriteUnaligned<ui32>(res + col.Offset + 1 + 1 * sizeof(ui32),
- overflowSize);
- std::memcpy(res + col.Offset + 1 + 2 * sizeof(ui32), data,
- prefixSize);
- std::memcpy(overflow.data() + overflowOffset, data + prefixSize,
- overflowSize);
- } else {
- Y_DEBUG_ABORT_UNLESS(size < 255);
- res[col.Offset] = size;
- std::memcpy(res + col.Offset + 1, data, size);
- std::memset(res + col.Offset + 1 + size, 0,
- col.DataSize - (size + 1));
- }
-
- if (anyOverflow && col.Role == EColumnRole::Key) {
- hash =
- CalculateCRC32<TTraits>((ui8 *)&size, sizeof(ui32), hash);
- hash = CalculateCRC32<TTraits>(data, size, hash);
- }
- }
-
- // isValid bitmap is NOT included into hashed data
- if (anyOverflow) {
- hash = CalculateCRC32<TTraits>(
- res + KeyColumnsOffset, KeyColumnsFixedEnd - KeyColumnsOffset,
- hash);
- } else {
- hash = CalculateCRC32<TTraits>(res + KeyColumnsOffset,
- KeyColumnsEnd - KeyColumnsOffset);
- }
- WriteUnaligned<ui32>(res, hash);
- }
-}
-
-template <>
-void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Unpack(
- ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const {
- std::vector<ui64> bitmaskMatrix(BitmaskSize, 0);
-
- {
- const auto bitmaskIdx = start / 8;
- const auto bitmaskShift = start % 8;
- const auto bitmaskIdxC = (start + count) / 8;
- const auto bitmaskShiftC = (start + count) % 8;
-
- /// ready first bitmatrix bytes
- for (ui32 j = Columns.size(); j--;)
- bitmaskMatrix[j / 8] |=
- (isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx] &
- ~(0xFF << bitmaskShift))
- << ((j % 8) * 8);
-
- /// ready last (which are same as above) bitmatrix bytes if needed
- if (bitmaskIdx == bitmaskIdxC)
- for (ui32 j = Columns.size(); j--;)
- bitmaskMatrix[j / 8] |=
- (isValidBitmask[Columns[j].OriginalIndex][bitmaskIdxC] &
- (0xFF << bitmaskShiftC))
- << ((j % 8) * 8);
-
- for (auto &m : bitmaskMatrix)
- m = transposeBitmatrix(m);
- }
-
- for (auto ind = 0; ind != start % 8; ++ind) {
- for (ui32 j = 0; j < BitmaskSize; ++j) {
- bitmaskMatrix[j] |=
- ui64(
- (res - (start % 8 - ind) * TotalRowSize)[BitmaskOffset + j])
- << (ind * 8);
- }
- }
-
- for (; count--; ++start, res += TotalRowSize) {
- const auto bitmaskIdx = start / 8;
- const auto bitmaskShift = start % 8;
-
- for (ui32 j = 0; j < BitmaskSize; ++j) {
- bitmaskMatrix[j] |= ui64(res[BitmaskOffset + j])
- << (bitmaskShift * 8);
- }
-
- if (bitmaskShift == 7 || count == 0) {
- for (auto &m : bitmaskMatrix)
- m = transposeBitmatrix(m);
- for (ui32 j = Columns.size(); j--;)
- isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx] =
- ui8(bitmaskMatrix[j / 8] >> ((j % 8) * 8));
- std::fill(bitmaskMatrix.begin(), bitmaskMatrix.end(), 0);
-
- if (count && count < 8) {
- /// ready last bitmatrix bytes
- for (ui32 j = Columns.size(); j--;)
- bitmaskMatrix[j / 8] |=
- (isValidBitmask[Columns[j].OriginalIndex]
- [bitmaskIdx + 1] &
- (0xFF << count))
- << ((j % 8) * 8);
-
- for (auto &m : bitmaskMatrix)
- m = transposeBitmatrix(m);
- }
- }
-
- for (auto &col : FixedNPOTColumns_) {
- std::memcpy(columns[col.OriginalIndex] + start * col.DataSize,
- res + col.Offset, col.DataSize);
- }
-
-#define PackPOTColumn(POT) \
- for (auto &col : FixedPOTColumns_[POT]) { \
- std::memcpy(columns[col.OriginalIndex] + start * (1u << POT), \
- res + col.Offset, 1u << POT); \
- }
- PackPOTColumn(0);
- PackPOTColumn(1);
- PackPOTColumn(2);
- PackPOTColumn(3);
- PackPOTColumn(4);
-#undef PackPOTColumn
-
- for (auto &col : VariableColumns_) {
- const auto dataOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * start);
- auto *const data = columns[col.OriginalIndex + 1] + dataOffset;
-
- ui32 size = ReadUnaligned<ui8>(res + col.Offset);
-
- if (size < 255) { // embedded str
- std::memcpy(data, res + col.Offset + 1, size);
- } else { // overflow buffer used
- const auto prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32));
- const auto overflowOffset = ReadUnaligned<ui32>(
- res + col.Offset + 1 + 0 * sizeof(ui32));
- const auto overflowSize = ReadUnaligned<ui32>(
- res + col.Offset + 1 + 1 * sizeof(ui32));
-
- std::memcpy(data, res + col.Offset + 1 + 2 * sizeof(ui32),
- prefixSize);
- std::memcpy(data + prefixSize, overflow.data() + overflowOffset,
- overflowSize);
-
- size = prefixSize + overflowSize;
- }
-
- WriteUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * (start + 1),
- dataOffset + size);
- }
- }
-}
-
-#define MULTI_8_I(C, i) \
- C(i, 0) C(i, 1) C(i, 2) C(i, 3) C(i, 4) C(i, 5) C(i, 6) C(i, 7)
-#define MULTI_8(C, A) \
- C(A, 0) C(A, 1) C(A, 2) C(A, 3) C(A, 4) C(A, 5) C(A, 6) C(A, 7)
-
-template <typename TTraits>
-void TTupleLayoutFallback<TTraits>::Pack(
- const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const {
- std::vector<const ui8 *> block_columns;
- for (const auto col_ind : BlockColumnsOrigInds_) {
- block_columns.push_back(columns[col_ind]);
- }
-
- for (size_t row_ind = 0; row_ind < count; row_ind += BlockRows_) {
- const size_t cur_block_size = std::min(count - row_ind, BlockRows_);
- size_t cols_past = 0;
-
- for (const auto &simd_block : SIMDBlock_) {
-#define CASE(i, j) \
- case i *kSIMDMaxCols + j: \
- SIMDPack<TTraits>::template PackTupleOrImpl<i + 1, j + 1>( \
- block_columns.data() + cols_past, res + simd_block.RowOffset, \
- cur_block_size, BlockFixedColsSizes_.data() + cols_past, \
- BlockColsOffsets_.data() + cols_past, TotalRowSize, \
- SIMDPermMasks_.data() + simd_block.PermMaskOffset, start); \
- break;
-
- switch ((simd_block.InnerLoopIters - 1) * kSIMDMaxCols +
- simd_block.Cols - 1) {
- MULTI_8(MULTI_8_I, CASE)
-
- default:
- std::abort();
- }
-
-#undef CASE
-
- cols_past += simd_block.Cols;
- }
-
- PackTupleFallbackColImpl(
- block_columns.data() + cols_past, res,
- BlockColsOffsets_.size() - cols_past, cur_block_size,
- BlockFixedColsSizes_.data() + cols_past,
- BlockColsOffsets_.data() + cols_past, TotalRowSize, start);
-
- for (ui32 cols_ind = 0; cols_ind < Columns.size(); cols_ind += 8) {
- const ui8 *bitmasks[8];
- const size_t cols = std::min<size_t>(8ul, Columns.size() - cols_ind);
- for (size_t ind = 0; ind != cols; ++ind) {
- const auto &col = Columns[cols_ind + ind];
- bitmasks[ind] = isValidBitmask[col.OriginalIndex] + start / 8;
- }
- const ui8 ones_byte = 0xFF;
- for (size_t ind = cols; ind != 8; ++ind) {
- // dereferencable + all-ones fast path
- bitmasks[ind] = &ones_byte;
- }
-
- const auto advance_masks = [&] {
- for (size_t ind = 0; ind != cols; ++ind) {
- ++bitmasks[ind];
- }
- };
-
- const size_t first_full_byte =
- std::min<size_t>((8ul - start) & 7, cur_block_size);
- size_t block_row_ind = 0;
-
- const auto simple_mask_transpose = [&](const size_t until) {
- for (; block_row_ind < until; ++block_row_ind) {
- const auto shift = (start + block_row_ind) % 8;
-
- const auto new_res = res + block_row_ind * TotalRowSize;
- const auto res = new_res;
-
- res[BitmaskOffset + cols_ind / 8] = 0;
- for (size_t col_ind = 0; col_ind != cols; ++col_ind) {
- res[BitmaskOffset + cols_ind / 8] |=
- ((bitmasks[col_ind][0] >> shift) & 1u) << col_ind;
- }
- }
- };
-
- simple_mask_transpose(first_full_byte);
- if (first_full_byte) {
- advance_masks();
- }
-
- for (; block_row_ind + 7 < cur_block_size; block_row_ind += 8) {
- transposeBitmatrix(res + block_row_ind * TotalRowSize +
- BitmaskOffset + cols_ind / 8,
- bitmasks, TotalRowSize);
- advance_masks();
- }
-
- simple_mask_transpose(cur_block_size);
- }
-
- for (size_t block_row_ind = 0; block_row_ind != cur_block_size;
- ++block_row_ind) {
-
- const auto new_start = start + block_row_ind;
- const auto start = new_start;
-
- const auto new_res = res + block_row_ind * TotalRowSize;
- const auto res = new_res;
-
- ui32 hash = 0;
- bool anyOverflow = false;
-
- for (ui32 i = KeyColumnsFixedNum; i < KeyColumns.size(); ++i) {
- auto &col = KeyColumns[i];
- auto dataOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * start);
- auto nextOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * (start + 1));
- auto size = nextOffset - dataOffset;
-
- if (size >= col.DataSize) {
- anyOverflow = true;
- break;
- }
- }
-
- for (auto &col : VariableColumns_) {
- auto dataOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * start);
- auto nextOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * (start + 1));
- auto size = nextOffset - dataOffset;
- auto data = columns[col.OriginalIndex + 1] + dataOffset;
- if (size >= col.DataSize) {
- res[col.Offset] = 255;
-
- auto prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32));
- auto overflowSize = size - prefixSize;
- auto overflowOffset = overflow.size();
-
- overflow.resize(overflowOffset + overflowSize);
-
- WriteUnaligned<ui32>(res + col.Offset + 1 +
- 0 * sizeof(ui32),
- overflowOffset);
- WriteUnaligned<ui32>(
- res + col.Offset + 1 + 1 * sizeof(ui32), overflowSize);
- std::memcpy(res + col.Offset + 1 + 2 * sizeof(ui32), data,
- prefixSize);
- std::memcpy(overflow.data() + overflowOffset,
- data + prefixSize, overflowSize);
- } else {
- Y_DEBUG_ABORT_UNLESS(size < 255);
- res[col.Offset] = size;
- std::memcpy(res + col.Offset + 1, data, size);
- std::memset(res + col.Offset + 1 + size, 0,
- col.DataSize - (size + 1));
- }
- if (anyOverflow && col.Role == EColumnRole::Key) {
- hash = CalculateCRC32<TTraits>((ui8 *)&size, sizeof(ui32),
- hash);
- hash = CalculateCRC32<TTraits>(data, size, hash);
- }
- }
-
- // isValid bitmap is NOT included into hashed data
- if (anyOverflow) {
- hash = CalculateCRC32<TTraits>(
- res + KeyColumnsOffset,
- KeyColumnsFixedEnd - KeyColumnsOffset, hash);
- } else {
- hash = CalculateCRC32<TTraits>(
- res + KeyColumnsOffset, KeyColumnsEnd - KeyColumnsOffset);
- }
- WriteUnaligned<ui32>(res, hash);
- }
-
- start += cur_block_size;
- res += cur_block_size * TotalRowSize;
- }
-}
-
-template <typename TTraits>
-void TTupleLayoutFallback<TTraits>::Unpack(
- ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const {
-
- std::vector<ui8 *> block_columns;
- for (const auto col_ind : BlockColumnsOrigInds_) {
- block_columns.push_back(columns[col_ind]);
- }
-
- for (size_t row_ind = 0; row_ind < count; row_ind += BlockRows_) {
- const size_t cur_block_size = std::min(count - row_ind, BlockRows_);
- size_t cols_past = 0;
-
- for (const auto &simd_block : SIMDBlock_) {
-#define CASE(i, j) \
- case i *kSIMDMaxCols + j: \
- SIMDPack<TTraits>::template UnpackTupleOrImpl<i + 1, j + 1>( \
- res + simd_block.RowOffset, block_columns.data() + cols_past, \
- cur_block_size, BlockFixedColsSizes_.data() + cols_past, \
- BlockColsOffsets_.data() + cols_past, TotalRowSize, \
- SIMDPermMasks_.data() + simd_block.PermMaskOffset + i * j, start); \
- break;
-
- switch ((simd_block.InnerLoopIters - 1) * kSIMDMaxCols +
- simd_block.Cols - 1) {
- MULTI_8(MULTI_8_I, CASE)
-
- default:
- std::abort();
- }
-
-#undef CASE
-
- cols_past += simd_block.Cols;
- }
-
- UnpackTupleFallbackColImpl(
- res, block_columns.data() + cols_past,
- BlockColsOffsets_.size() - cols_past, cur_block_size,
- BlockFixedColsSizes_.data() + cols_past,
- BlockColsOffsets_.data() + cols_past, TotalRowSize, start);
-
- for (ui32 cols_ind = 0; cols_ind < Columns.size(); cols_ind += 8) {
- ui8 *bitmasks[8];
- const size_t cols = std::min<size_t>(8ul, Columns.size() - cols_ind);
- for (size_t ind = 0; ind != cols; ++ind) {
- const auto &col = Columns[cols_ind + ind];
- bitmasks[ind] = isValidBitmask[col.OriginalIndex] + start / 8;
- }
- ui8 trash_byte;
- for (size_t ind = cols; ind != 8; ++ind) {
- bitmasks[ind] = &trash_byte; // dereferencable
- }
-
- const auto advance_masks = [&] {
- for (size_t ind = 0; ind != cols; ++ind) {
- ++bitmasks[ind];
- }
- };
-
- const size_t first_full_byte =
- std::min<size_t>((8ul - start) & 7, cur_block_size);
- size_t block_row_ind = 0;
-
- const auto simple_mask_transpose = [&](const size_t until) {
- for (size_t col_ind = 0;
- block_row_ind != until && col_ind != cols; ++col_ind) {
- auto col_bitmask =
- bitmasks[col_ind][0] & ~((0xFF << (block_row_ind & 7)) ^
- (0xFF << (until & 7)));
-
- for (size_t row_ind = block_row_ind; row_ind < until;
- ++row_ind) {
- const auto shift = (start + row_ind) % 8;
-
- const auto new_res = res + row_ind * TotalRowSize;
- const auto res = new_res;
-
- col_bitmask |=
- ((res[BitmaskOffset + cols_ind / 8] >> col_ind) &
- 1u)
- << shift;
- }
-
- bitmasks[col_ind][0] = col_bitmask;
- }
- block_row_ind = until;
- };
-
- simple_mask_transpose(first_full_byte);
- if (first_full_byte) {
- advance_masks();
- }
-
- for (; block_row_ind + 7 < cur_block_size; block_row_ind += 8) {
- transposeBitmatrix(bitmasks,
- res + block_row_ind * TotalRowSize +
- BitmaskOffset + cols_ind / 8,
- TotalRowSize);
- advance_masks();
- }
-
- simple_mask_transpose(cur_block_size);
- }
-
- for (size_t block_row_ind = 0; block_row_ind != cur_block_size;
- ++block_row_ind) {
-
- const auto new_start = start + block_row_ind;
- const auto start = new_start;
-
- const auto new_res = res + block_row_ind * TotalRowSize;
- const auto res = new_res;
-
- for (auto &col : VariableColumns_) {
- const auto dataOffset = ReadUnaligned<ui32>(
- columns[col.OriginalIndex] + sizeof(ui32) * start);
- auto *const data = columns[col.OriginalIndex + 1] + dataOffset;
-
- ui32 size = ReadUnaligned<ui8>(res + col.Offset);
-
- if (size < 255) { // embedded str
- std::memcpy(data, res + col.Offset + 1, size);
- } else { // overflow buffer used
- const auto prefixSize =
- (col.DataSize - 1 - 2 * sizeof(ui32));
- const auto overflowOffset = ReadUnaligned<ui32>(
- res + col.Offset + 1 + 0 * sizeof(ui32));
- const auto overflowSize = ReadUnaligned<ui32>(
- res + col.Offset + 1 + 1 * sizeof(ui32));
-
- std::memcpy(data, res + col.Offset + 1 + 2 * sizeof(ui32),
- prefixSize);
- std::memcpy(data + prefixSize,
- overflow.data() + overflowOffset, overflowSize);
-
- size = prefixSize + overflowSize;
- }
-
- WriteUnaligned<ui32>(columns[col.OriginalIndex] +
- sizeof(ui32) * (start + 1),
- dataOffset + size);
- }
- }
-
- start += cur_block_size;
- res += cur_block_size * TotalRowSize;
- }
-}
-
-template __attribute__((target("avx2"))) void
-TTupleLayoutFallback<NSimd::TSimdAVX2Traits>::Pack(
- const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-template __attribute__((target("sse4.2"))) void
-TTupleLayoutFallback<NSimd::TSimdSSE42Traits>::Pack(
- const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-
-template __attribute__((target("avx2"))) void
-TTupleLayoutFallback<NSimd::TSimdAVX2Traits>::Unpack(
- ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-template __attribute__((target("sse4.2"))) void
-TTupleLayoutFallback<NSimd::TSimdSSE42Traits>::Unpack(
- ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-
-} // namespace NPackedTuple
-} // namespace NMiniKQL
-} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h b/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h
deleted file mode 100644
index 3c6b76750c..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h
+++ /dev/null
@@ -1,136 +0,0 @@
-#pragma once
-
-#include <yql/essentials/minikql/mkql_node.h>
-#include <yql/essentials/public/udf/udf_data_type.h>
-#include <yql/essentials/public/udf/udf_types.h>
-
-#include <util/generic/buffer.h>
-
-#include <util/system/cpu_id.h>
-#include <contrib/ydb/library/yql/utils/simd/simd.h>
-
-namespace NKikimr {
-namespace NMiniKQL {
-namespace NPackedTuple {
-
-// Defines if data type of particular column variable or fixed
-enum class EColumnSizeType { Fixed, Variable };
-
-// Defines if particular column is key column or payload column
-enum class EColumnRole { Key, Payload };
-
-// Describes layout and size of particular column
-struct TColumnDesc {
- ui32 ColumnIndex = 0; // Index of the column in particular layout
- ui32 OriginalIndex = 0; // Index of the column in input representation
- EColumnRole Role = EColumnRole::Payload; // Role of the particular column in
- // tuple (Key or Payload)
- EColumnSizeType SizeType =
- EColumnSizeType::Fixed; // Fixed size or variable size column
- ui32 DataSize = 0; // Size of the column in bytes for fixed size part
- // Must be same for matching key columns
- ui32 Offset =
- 0; // Offset in bytes for column value from the beginning of tuple
-};
-
-// Defines in memory layout of tuple.
-struct TTupleLayout {
- std::vector<TColumnDesc> OrigColumns; // Columns description and order as
- // passed during layout construction
- std::vector<TColumnDesc> Columns; // Vector describing all columns in order
- // corresponding to tuple layout
- std::vector<TColumnDesc> KeyColumns; // Vector describing key columns
- std::vector<TColumnDesc>
- PayloadColumns; // Vector describing payload columns
- ui32 KeyColumnsNum; // Total number of key columns
- ui32 KeyColumnsSize; // Total size of all key columns in bytes
- ui32 KeyColumnsOffset; // Start of row-packed keys data
- ui32 KeyColumnsFixedEnd; // Offset in row-packed keys data of first variable
- // key (can be same as KeyColumnsEnd, if there are
- // none)
- ui32 KeyColumnsFixedNum; // Number of fixed-size columns
- ui32 KeyColumnsEnd; // First byte after key columns. Start of bitmask for
- // row-based columns
- ui32 BitmaskSize; // Size of bitmask for null values flag in columns
- ui32 BitmaskOffset; // Offset of nulls bitmask. = KeyColumnsEnd
- ui32 BitmaskEnd; // First byte after bitmask. = PayloadOffset
- ui32 PayloadSize; // Total size in bytes of the payload columns
- ui32 PayloadOffset; // Offset of payload values. = BitmaskEnd.
- ui32 PayloadEnd; // First byte after payload
- ui32 TotalRowSize; // Total size of bytes for packed row
-
- // Creates new tuple layout based on provided columns description.
- static THolder<TTupleLayout>
- Create(const std::vector<TColumnDesc> &columns);
-
- TTupleLayout(const std::vector<TColumnDesc> &columns)
- : OrigColumns(columns) {}
- virtual ~TTupleLayout() {}
-
- // Takes array of pointer to columns, array of validity bitmaps,
- // outputs packed rows
- virtual void Pack(const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow,
- ui32 start, ui32 count) const = 0;
-
- // Takes packed rows,
- // outputs array of pointer to columns, array of validity bitmaps
- virtual void Unpack(ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow,
- ui32 start, ui32 count) const = 0;
-};
-
-template <typename TTrait> struct TTupleLayoutFallback : public TTupleLayout {
-
- TTupleLayoutFallback(const std::vector<TColumnDesc> &columns);
-
- void Pack(const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const override;
-
- void Unpack(ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow,
- ui32 start, ui32 count) const override;
-
- private:
- std::array<std::vector<TColumnDesc>, 5>
- FixedPOTColumns_; // Fixed-size columns for power-of-two sizes from 1 to
- // 16 bytes
- std::vector<TColumnDesc> FixedNPOTColumns_; // Remaining fixed-size columns
- std::vector<TColumnDesc> VariableColumns_; // Variable-size columns only
- using TSimdI8 = typename TTrait::TSimdI8;
- template <class T> using TSimd = typename TTrait::template TSimd8<T>;
-
- static constexpr ui8 kSIMDMaxCols = 8;
- static constexpr ui8 kSIMDMaxInnerLoopSize = 8;
-
- size_t BlockRows_; // Estimated rows per cache block
- std::vector<size_t> BlockColsOffsets_;
- std::vector<size_t> BlockFixedColsSizes_;
- std::vector<size_t> BlockColumnsOrigInds_;
-
- struct SIMDDesc {
- ui8 InnerLoopIters;
- ui8 Cols;
- size_t PermMaskOffset;
- size_t RowOffset;
- };
- std::vector<SIMDDesc> SIMDBlock_; // SIMD iterations description
- std::vector<TSimd<ui8>> SIMDPermMasks_; // SIMD precomputed masks
-};
-
-template <>
-void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Pack(
- const ui8 **columns, const ui8 **isValidBitmask, ui8 *res,
- std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-
-template <>
-void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Unpack(
- ui8 **columns, ui8 **isValidBitmask, const ui8 *res,
- const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start,
- ui32 count) const;
-
-} // namespace NPackedTuple
-} // namespace NMiniKQL
-} // namespace NKikimr
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make b/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make
deleted file mode 100644
index 4d27ab3b99..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make
+++ /dev/null
@@ -1,41 +0,0 @@
-UNITTEST_FOR(yql/essentials/minikql/comp_nodes/packed_tuple)
-
-IF (SANITIZER_TYPE OR NOT OPENSOURCE)
- REQUIREMENTS(ram:32)
-ENDIF()
-
-IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
- TIMEOUT(3600)
- SIZE(LARGE)
- TAG(ya:fat)
-ELSE()
- TIMEOUT(600)
- SIZE(MEDIUM)
-ENDIF()
-
-
-SRCS(
- packed_tuple_ut.cpp
-)
-
-PEERDIR(
- yql/essentials/public/udf
- yql/essentials/public/udf/arrow
- yql/essentials/public/udf/service/exception_policy
- yql/essentials/sql/pg_dummy
-)
-
-CFLAGS(
- -mprfchw
-)
-
-YQL_LAST_ABI_VERSION()
-
-IF (MKQL_RUNTIME_VERSION)
- CFLAGS(
- -DMKQL_RUNTIME_VERSION=$MKQL_RUNTIME_VERSION
- )
-ENDIF()
-
-
-END()
diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make b/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make
deleted file mode 100644
index fa3ffbf859..0000000000
--- a/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make
+++ /dev/null
@@ -1,28 +0,0 @@
-LIBRARY()
-
-SRCS(
- tuple.cpp
-)
-
-PEERDIR(
- contrib/libs/apache/arrow
- yql/essentials/types/binary_json
- yql/essentials/minikql
- yql/essentials/utils
- yql/essentials/utils/log
- library/cpp/digest/crc32c
-)
-
-CFLAGS(
- -mprfchw
- -mavx2
- -DMKQL_DISABLE_CODEGEN
-)
-
-YQL_LAST_ABI_VERSION()
-
-END()
-
-RECURSE_FOR_TESTS(
- ut
-)
diff --git a/yql/essentials/minikql/comp_nodes/ya.make b/yql/essentials/minikql/comp_nodes/ya.make
index 1b2cc49327..fa7a96b61a 100644
--- a/yql/essentials/minikql/comp_nodes/ya.make
+++ b/yql/essentials/minikql/comp_nodes/ya.make
@@ -13,7 +13,6 @@ END()
RECURSE(
llvm14
no_llvm
- packed_tuple
)
RECURSE_FOR_TESTS(
diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc
index 89b315fa39..3c1531eac7 100644
--- a/yql/essentials/minikql/comp_nodes/ya.make.inc
+++ b/yql/essentials/minikql/comp_nodes/ya.make.inc
@@ -160,7 +160,6 @@ PEERDIR(
yql/essentials/public/udf/arrow
yql/essentials/parser/pg_wrapper/interface
yql/essentials/utils
- contrib/ydb/library/actors/core
yql/essentials/public/issue/protos
)
diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
index 0a28de9162..400f77e9d1 100644
--- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
+++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp
@@ -11,7 +11,6 @@
#include <yql/essentials/minikql/computation/mkql_computation_node_impl.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
-#include <contrib/ydb/library/yql/dq/proto/dq_tasks.pb.h>
#include <library/cpp/testing/unittest/registar.h>
diff --git a/yql/essentials/minikql/computation/mkql_spiller_factory.h b/yql/essentials/minikql/computation/mkql_spiller_factory.h
index c9c1bfab38..0d6dffe673 100644
--- a/yql/essentials/minikql/computation/mkql_spiller_factory.h
+++ b/yql/essentials/minikql/computation/mkql_spiller_factory.h
@@ -2,7 +2,9 @@
#include "mkql_spiller.h"
-#include <contrib/ydb/library/yql/dq/actors/spilling/spilling_counters.h>
+namespace NYql::NDq {
+struct TSpillingTaskCounters;
+}
namespace NKikimr::NMiniKQL {
@@ -11,7 +13,7 @@ class ISpillerFactory : private TNonCopyable
public:
virtual ISpiller::TPtr CreateSpiller() = 0;
- virtual void SetTaskCounters(TIntrusivePtr<NYql::NDq::TSpillingTaskCounters> spillingTaskCounters) = 0;
+ virtual void SetTaskCounters(const TIntrusivePtr<NYql::NDq::TSpillingTaskCounters>& spillingTaskCounters) = 0;
virtual ~ISpillerFactory(){}
};
diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
index 9ea74569b1..c053b2c52e 100644
--- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
+++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h
@@ -8,7 +8,7 @@ namespace NKikimr::NMiniKQL {
class TMockSpillerFactory : public ISpillerFactory
{
public:
- void SetTaskCounters(TIntrusivePtr<NYql::NDq::TSpillingTaskCounters> /*spillingTaskCounters*/) override {
+ void SetTaskCounters(const TIntrusivePtr<NYql::NDq::TSpillingTaskCounters>& /*spillingTaskCounters*/) override {
}
ISpiller::TPtr CreateSpiller() override {
diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc
index 0b419140e2..4de4b13fc5 100644
--- a/yql/essentials/minikql/computation/ut/ya.make.inc
+++ b/yql/essentials/minikql/computation/ut/ya.make.inc
@@ -30,7 +30,6 @@ PEERDIR(
library/cpp/threading/local_executor
yql/essentials/parser/pg_wrapper
yql/essentials/public/udf/service/exception_policy
- contrib/ydb/library/yql/dq/proto
)
YQL_LAST_ABI_VERSION()
diff --git a/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp b/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp
index 78d62d423d..0f2a676190 100644
--- a/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp
+++ b/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp
@@ -572,12 +572,13 @@ struct TStringConvert {
};
NUdf::TUnboxedValuePod JsonToJsonDocument(const NUdf::TUnboxedValuePod value) {
- auto binaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(value.AsStringRef());
- if (!binaryJson.IsSuccess()) {
+ auto maybeBinaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(value.AsStringRef());
+ if (std::holds_alternative<TString>(maybeBinaryJson)) {
// JSON parse error happened, return NULL
return NUdf::TUnboxedValuePod();
}
- return MakeString(TStringBuf(binaryJson->Data(), binaryJson->Size()));
+ const auto& binaryJson = std::get<NKikimr::NBinaryJson::TBinaryJson>(maybeBinaryJson);
+ return MakeString(TStringBuf(binaryJson.Data(), binaryJson.Size()));
}
struct TJsonToJsonDocumentConvert {
diff --git a/yql/essentials/minikql/jsonpath/ut/test_base.cpp b/yql/essentials/minikql/jsonpath/ut/test_base.cpp
index feceecddb1..c9b4f5669d 100644
--- a/yql/essentials/minikql/jsonpath/ut/test_base.cpp
+++ b/yql/essentials/minikql/jsonpath/ut/test_base.cpp
@@ -26,7 +26,7 @@ void TJsonPathTestBase::RunTestCase(const TString& rawJson, const TString& rawJs
try {
const auto unboxedValueJson = TValue(ParseJson(rawJson));
- const auto binaryJson = *SerializeToBinaryJson(rawJson);;
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson));
auto reader = TBinaryJsonReader::Make(binaryJson);
auto binaryJsonRoot = TValue(reader->GetRootCursor());
@@ -77,7 +77,7 @@ void TJsonPathTestBase::RunRuntimeErrorTestCase(const TString& rawJson, const TS
try {
const auto unboxedValueJson = TValue(ParseJson(rawJson));
- const auto binaryJson = *SerializeToBinaryJson(rawJson);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson));
auto reader = TBinaryJsonReader::Make(binaryJson);
auto binaryJsonRoot = TValue(reader->GetRootCursor());
@@ -105,7 +105,7 @@ void TJsonPathTestBase::RunVariablesTestCase(const TString& rawJson, const THash
try {
const auto unboxedValueJson = TValue(ParseJson(rawJson));
- const auto binaryJson = *SerializeToBinaryJson(rawJson);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson));
auto reader = TBinaryJsonReader::Make(binaryJson);
auto binaryJsonRoot = TValue(reader->GetRootCursor());
@@ -120,7 +120,7 @@ void TJsonPathTestBase::RunVariablesTestCase(const TString& rawJson, const THash
storage.reserve(variables.size());
readers.reserve(variables.size());
for (const auto& it : variables) {
- storage.push_back(*SerializeToBinaryJson(it.second));
+ storage.push_back(std::get<TBinaryJson>(SerializeToBinaryJson(it.second)));
readers.push_back(TBinaryJsonReader::Make(storage.back()));
binaryJsonVariables[it.first] = TValue(readers.back()->GetRootCursor());
}
diff --git a/yql/essentials/minikql/mkql_type_ops.cpp b/yql/essentials/minikql/mkql_type_ops.cpp
index 6b823dbdf5..11762e96c4 100644
--- a/yql/essentials/minikql/mkql_type_ops.cpp
+++ b/yql/essentials/minikql/mkql_type_ops.cpp
@@ -2518,11 +2518,12 @@ NUdf::TUnboxedValuePod ValueFromString(NUdf::EDataSlot type, NUdf::TStringRef bu
case NUdf::EDataSlot::JsonDocument: {
auto binaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(buf);
- if (binaryJson.IsFail()) {
+ if (std::holds_alternative<TString>(binaryJson)) {
// JSON parse error happened, return NULL
return NUdf::TUnboxedValuePod();
}
- return MakeString(TStringBuf(binaryJson->Data(), binaryJson->Size()));
+ const auto& value = std::get<NKikimr::NBinaryJson::TBinaryJson>(binaryJson);
+ return MakeString(TStringBuf(value.Data(), value.Size()));
}
case NUdf::EDataSlot::Decimal:
diff --git a/yql/essentials/providers/common/dq/ya.make b/yql/essentials/providers/common/dq/ya.make
new file mode 100644
index 0000000000..e0ce0e804a
--- /dev/null
+++ b/yql/essentials/providers/common/dq/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ yql_dq_integration_impl.cpp
+ yql_dq_optimization_impl.cpp
+)
+
+PEERDIR(
+ yql/essentials/core/dq_integration
+)
+
+END()
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
new file mode 100644
index 0000000000..600e3c72e2
--- /dev/null
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp
@@ -0,0 +1,96 @@
+#include "yql_dq_integration_impl.h"
+
+namespace NYql {
+
+ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&,
+ TVector<TString>&, TString*, TExprContext&, bool) {
+ return 0;
+}
+
+bool TDqIntegrationBase::CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) {
+ Y_UNUSED(skipIssues);
+ Y_UNUSED(node);
+ Y_UNUSED(ctx);
+ return true;
+}
+
+bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) {
+ return false;
+}
+
+TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<const TExprNode*> &, TExprContext&) {
+ return Nothing();
+}
+
+TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) {
+ return read;
+}
+
+TMaybe<TOptimizerStatistics> TDqIntegrationBase::ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) {
+ Y_UNUSED(readWrap);
+ Y_UNUSED(ctx);
+ return Nothing();
+}
+
+TExprNode::TPtr TDqIntegrationBase::RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) {
+ Y_UNUSED(ctx);
+ return write;
+}
+
+TMaybe<bool> TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) {
+ return Nothing();
+}
+
+bool TDqIntegrationBase::CanBlockRead(const NNodes::TExprBase&, TExprContext&, TTypeAnnotationContext&) {
+ return false;
+}
+
+TExprNode::TPtr TDqIntegrationBase::WrapWrite(const TExprNode::TPtr& write, TExprContext&) {
+ return write;
+}
+
+void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) {
+}
+
+bool TDqIntegrationBase::CanFallback() {
+ return false;
+}
+
+void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t, TExprContext&) {
+}
+
+void TDqIntegrationBase::FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) {
+ Y_UNUSED(node);
+ Y_UNUSED(settings);
+ Y_UNUSED(sourceType);
+ YQL_ENSURE(false);
+}
+
+void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) {
+}
+
+void TDqIntegrationBase::FillTransformSettings(const TExprNode&, ::google::protobuf::Any&) {
+}
+
+void TDqIntegrationBase::Annotate(const TExprNode&, THashMap<TString, TString>&) {
+}
+
+bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode&, TExprContext&, THashMap<TString, TString>&, THashMap<TString, TString>&) {
+ return false;
+}
+
+void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter&, const TVector<TString>&, const THashMap<TString, TString>&) {
+}
+
+bool TDqIntegrationBase::FillSourcePlanProperties(const NNodes::TExprBase&, TMap<TString, NJson::TJsonValue>&) {
+ return false;
+}
+
+bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<TString, NJson::TJsonValue>&) {
+ return false;
+}
+
+void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) {
+}
+
+} // namespace NYql
diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
new file mode 100644
index 0000000000..7cc5fd7d89
--- /dev/null
+++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h
@@ -0,0 +1,37 @@
+#pragma once
+
+#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
+
+namespace NYql {
+
+class TDqIntegrationBase: public IDqIntegration {
+public:
+ ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node,
+ TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override;
+ bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override;
+ bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override;
+ TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override;
+ TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override;
+ TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override;
+ TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
+ void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override;
+ TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override;
+ bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override;
+ TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override;
+ bool CanFallback() override;
+ void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t, TExprContext&) override;
+ void FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override;
+ void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override;
+ void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override;
+ void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override;
+ bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override;
+ void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override;
+ bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
+ bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override;
+ void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override;
+
+protected:
+ bool CanBlockReadTypes(const TStructExprType* node);
+};
+
+} // namespace NYql
diff --git a/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp
new file mode 100644
index 0000000000..6afa622f05
--- /dev/null
+++ b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp
@@ -0,0 +1,29 @@
+#include "yql_dq_optimization_impl.h"
+
+namespace NYql {
+
+TExprNode::TPtr TDqOptimizationBase::RewriteRead(const TExprNode::TPtr& read, TExprContext& /*ctx*/) {
+ return read;
+}
+
+TExprNode::TPtr TDqOptimizationBase::RewriteLookupRead(const TExprNode::TPtr& /*read*/, TExprContext& /*ctx*/) {
+ return {};
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& /*members*/, TExprContext& /*ctx*/) {
+ return read;
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& /*countBase*/, TExprContext& /*ctx*/) {
+ return read;
+}
+
+TExprNode::TPtr TDqOptimizationBase::ApplyUnordered(const TExprNode::TPtr& read, TExprContext& /*ctx*/) {
+ return read;
+}
+
+TExprNode::TListType TDqOptimizationBase::ApplyExtend(const TExprNode::TListType& reads, bool /*ordered*/, TExprContext& /*ctx*/) {
+ return reads;
+}
+
+} // namespace NYql
diff --git a/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h
new file mode 100644
index 0000000000..1427b50b0a
--- /dev/null
+++ b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <yql/essentials/core/dq_integration/yql_dq_optimization.h>
+
+namespace NYql {
+
+class TDqOptimizationBase: public IDqOptimization {
+public:
+ TExprNode::TPtr RewriteRead(const TExprNode::TPtr& read, TExprContext& ctx) override;
+ TExprNode::TPtr RewriteLookupRead(const TExprNode::TPtr& read, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& members, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& countBase, TExprContext& ctx) override;
+ TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& read, TExprContext& ctx) override;
+ TExprNode::TListType ApplyExtend(const TExprNode::TListType& reads, bool ordered, TExprContext& ctx) override;
+};
+
+} // namespace NYql
diff --git a/yql/essentials/providers/common/ya.make b/yql/essentials/providers/common/ya.make
index 1af0f9eddb..eba3397675 100644
--- a/yql/essentials/providers/common/ya.make
+++ b/yql/essentials/providers/common/ya.make
@@ -4,6 +4,7 @@ RECURSE(
codec
comp_nodes
config
+ dq
gateway
metrics
mkql
diff --git a/yql/essentials/providers/pg/provider/ya.make b/yql/essentials/providers/pg/provider/ya.make
index c117f7a975..6c0f6c0d03 100644
--- a/yql/essentials/providers/pg/provider/ya.make
+++ b/yql/essentials/providers/pg/provider/ya.make
@@ -17,8 +17,8 @@ YQL_LAST_ABI_VERSION()
PEERDIR(
yql/essentials/core
yql/essentials/core/type_ann
- contrib/ydb/library/yql/dq/integration
- contrib/ydb/library/yql/providers/common/dq
+ yql/essentials/core/dq_integration
+ yql/essentials/providers/common/dq
yql/essentials/providers/common/provider
yql/essentials/providers/common/transform
yql/essentials/providers/pg/expr_nodes
diff --git a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
index 9940898e9b..2e8a9b434f 100644
--- a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
+++ b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp
@@ -1,5 +1,5 @@
#include "yql_pg_provider_impl.h"
-#include <contrib/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h>
+#include <yql/essentials/providers/common/dq/yql_dq_integration_impl.h>
#include <yql/essentials/providers/pg/expr_nodes/yql_pg_expr_nodes.h>
namespace NYql {
diff --git a/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h b/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h
index f63b54b5c2..c4cc036bae 100644
--- a/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h
+++ b/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h
@@ -4,7 +4,7 @@
#include <yql/essentials/core/yql_data_provider.h>
#include <yql/essentials/providers/common/transform/yql_visit.h>
#include <yql/essentials/providers/common/transform/yql_exec.h>
-#include <contrib/ydb/library/yql/dq/integration/yql_dq_integration.h>
+#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <util/generic/ptr.h>
diff --git a/yql/essentials/types/binary_json/ut/container_ut.cpp b/yql/essentials/types/binary_json/ut/container_ut.cpp
index 92144169b4..869d2323bd 100644
--- a/yql/essentials/types/binary_json/ut/container_ut.cpp
+++ b/yql/essentials/types/binary_json/ut/container_ut.cpp
@@ -35,7 +35,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
@@ -52,7 +52,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
@@ -78,7 +78,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.Json);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
const auto element = container.GetElement(testCase.Index);
@@ -100,7 +100,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.Json);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
@@ -145,7 +145,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.Json);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
const auto result = container.Lookup(testCase.Key);
@@ -186,7 +186,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.Json);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
diff --git a/yql/essentials/types/binary_json/ut/entry_ut.cpp b/yql/essentials/types/binary_json/ut/entry_ut.cpp
index 998bd64f63..0f099ed59d 100644
--- a/yql/essentials/types/binary_json/ut/entry_ut.cpp
+++ b/yql/essentials/types/binary_json/ut/entry_ut.cpp
@@ -33,7 +33,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
@@ -50,7 +50,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
const auto innerContainer = container.GetElement(0).GetContainer();
@@ -67,7 +67,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
@@ -86,7 +86,7 @@ public:
};
for (const auto& testCase : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(testCase.first);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first));
const auto reader = TBinaryJsonReader::Make(binaryJson);
const auto container = reader->GetRootCursor();
diff --git a/yql/essentials/types/binary_json/ut/identity_ut.cpp b/yql/essentials/types/binary_json/ut/identity_ut.cpp
index 58086d9537..50c4299d5c 100644
--- a/yql/essentials/types/binary_json/ut/identity_ut.cpp
+++ b/yql/essentials/types/binary_json/ut/identity_ut.cpp
@@ -40,7 +40,7 @@ public:
void TestReadToJsonDom() {
for (const TStringBuf json : TestCases) {
- const auto binaryJson = *NBinaryJson::SerializeToBinaryJson(json);
+ const auto binaryJson = std::get<TBinaryJson>(NBinaryJson::SerializeToBinaryJson(json));
const auto value = NBinaryJson::ReadToJsonDom(binaryJson, &ValueBuilder);
const auto jsonAfterBinaryJson = NDom::SerializeJsonDom(value);
@@ -50,7 +50,7 @@ public:
void TestSerializeToJson() {
for (const TStringBuf json : TestCases) {
- const auto binaryJson = *NBinaryJson::SerializeToBinaryJson(json);
+ const auto binaryJson = std::get<TBinaryJson>(NBinaryJson::SerializeToBinaryJson(json));
const auto jsonAfterBinaryJson = NBinaryJson::SerializeToJson(binaryJson);
UNIT_ASSERT_VALUES_EQUAL(json, jsonAfterBinaryJson);
diff --git a/yql/essentials/types/binary_json/ut/valid_ut.cpp b/yql/essentials/types/binary_json/ut/valid_ut.cpp
index f8b1661866..3078375877 100644
--- a/yql/essentials/types/binary_json/ut/valid_ut.cpp
+++ b/yql/essentials/types/binary_json/ut/valid_ut.cpp
@@ -45,7 +45,7 @@ public:
};
for (const TStringBuf json : testCases) {
- const auto binaryJson = *SerializeToBinaryJson(json);
+ const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(json));
const TStringBuf buffer(binaryJson.Data(), binaryJson.Size());
const auto error = IsValidBinaryJsonWithError(buffer);
UNIT_ASSERT_C(!error.Defined(), TStringBuilder() << "BinaryJson for '" << json << "' is invalid because of '" << *error << "'");
diff --git a/yql/essentials/types/binary_json/write.cpp b/yql/essentials/types/binary_json/write.cpp
index 9eecc0cba3..28dffea9db 100644
--- a/yql/essentials/types/binary_json/write.cpp
+++ b/yql/essentials/types/binary_json/write.cpp
@@ -715,26 +715,31 @@ template <typename TOnDemandValue>
}
}
-TConclusion<TBinaryJson> SerializeToBinaryJsonImpl(const TStringBuf json) {
+std::variant<TBinaryJson, TString> SerializeToBinaryJsonImpl(const TStringBuf json) {
+ std::variant<TBinaryJson, TString> res;
TBinaryJsonCallbacks callbacks(/* throwException */ false);
const simdjson::padded_string paddedJson(json);
simdjson::ondemand::parser parser;
try {
auto doc = parser.iterate(paddedJson);
if (auto status = doc.error(); status != simdjson::SUCCESS) {
- return TConclusionStatus::Fail(simdjson::error_message(status));
+ res = TString(simdjson::error_message(status));
+ return res;
}
if (auto status = SimdJsonToJsonIndex(doc.value_unsafe(), callbacks); status != simdjson::SUCCESS) {
- return TConclusionStatus::Fail(simdjson::error_message(status));
+ res = TString(simdjson::error_message(status));
+ return res;
}
} catch (const simdjson::simdjson_error& e) {
- return TConclusionStatus::Fail(e.what());
+ res = TString(e.what());
+ return res;
}
TBinaryJsonSerializer serializer(std::move(callbacks).GetResult());
- return std::move(serializer).Serialize();
+ res = std::move(serializer).Serialize();
+ return res;
}
-TConclusion<TBinaryJson> SerializeToBinaryJson(const TStringBuf json) {
+std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json) {
return SerializeToBinaryJsonImpl(json);
}
diff --git a/yql/essentials/types/binary_json/write.h b/yql/essentials/types/binary_json/write.h
index 0083637963..c4f8f89d2d 100644
--- a/yql/essentials/types/binary_json/write.h
+++ b/yql/essentials/types/binary_json/write.h
@@ -2,17 +2,18 @@
#include "format.h"
-#include <contrib/ydb/library/conclusion/result.h>
#include <yql/essentials/minikql/dom/node.h>
#include <util/generic/maybe.h>
+#include <variant>
+
namespace NKikimr::NBinaryJson {
/**
* @brief Translates textual JSON into BinaryJson
*/
-TConclusion<TBinaryJson> SerializeToBinaryJson(const TStringBuf json);
+std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json);
/**
* @brief Translates DOM layout from `yql/library/dom` library into BinaryJson