diff options
author | vvvv <vvvv@yandex-team.com> | 2024-11-09 03:48:37 +0300 |
---|---|---|
committer | Vitaly Stoyan <vvvv@ydb.tech> | 2024-11-09 12:02:05 +0300 |
commit | ea0c2bd5b40e0be0147c40092f9d270ec11d015a (patch) | |
tree | ea1c768caad5d542e2ff8740e8ce5b55840037c4 /yql | |
parent | 2d0e7498c5e5f795c1a040623052b112691fac7e (diff) | |
download | ydb-ea0c2bd5b40e0be0147c40092f9d270ec11d015a.tar.gz |
Tune YDB <-> YQL deps
init
commit_hash:16572ab4e94aea4f7455c2ccb90b70ea99a412db
Diffstat (limited to 'yql')
40 files changed, 509 insertions, 2617 deletions
diff --git a/yql/essentials/core/dq_integration/transform/ya.make b/yql/essentials/core/dq_integration/transform/ya.make new file mode 100644 index 0000000000..192fcf8921 --- /dev/null +++ b/yql/essentials/core/dq_integration/transform/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + yql_dq_task_transform.cpp +) + +PEERDIR( + yql/essentials/minikql +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp new file mode 100644 index 0000000000..d520ec2792 --- /dev/null +++ b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.cpp @@ -0,0 +1,22 @@ +#include <yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h> + +namespace NYql { + +TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories) { + return [factories = std::move(factories)] (const THashMap<TString, TString>& taskParams, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry) -> NKikimr::NMiniKQL::TCallableVisitFuncProvider { + TVector<NKikimr::NMiniKQL::TCallableVisitFuncProvider> funcProviders; + for (auto& factory: factories) { + funcProviders.push_back(factory(taskParams, funcRegistry)); + } + return [funcProviders = std::move(funcProviders)] (const NKikimr::NMiniKQL::TInternName& name) -> NKikimr::NMiniKQL::TCallableVisitFunc { + for (auto& provider: funcProviders) { + if (auto res = provider(name)) { + return res; + } + } + return {}; + }; + }; +} + +} // NYql diff --git a/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h new file mode 100644 index 0000000000..bad5401ee4 --- /dev/null +++ b/yql/essentials/core/dq_integration/transform/yql_dq_task_transform.h @@ -0,0 +1,17 @@ +#pragma once + +#include <yql/essentials/minikql/mkql_node_visitor.h> +#include <yql/essentials/minikql/mkql_function_registry.h> + +#include <util/generic/hash.h> +#include <util/generic/string.h> + +#include <functional> + +namespace NYql { + +using TTaskTransformFactory = std::function<NKikimr::NMiniKQL::TCallableVisitFuncProvider(const THashMap<TString, TString>&, const NKikimr::NMiniKQL::IFunctionRegistry*)>; + +TTaskTransformFactory CreateCompositeTaskTransformFactory(TVector<TTaskTransformFactory> factories); + +} // namespace NYql diff --git a/yql/essentials/core/dq_integration/ya.make b/yql/essentials/core/dq_integration/ya.make new file mode 100644 index 0000000000..9e1ba48828 --- /dev/null +++ b/yql/essentials/core/dq_integration/ya.make @@ -0,0 +1,22 @@ +LIBRARY() + +SRCS( + yql_dq_integration.cpp + yql_dq_optimization.cpp +) + +PEERDIR( + contrib/libs/protobuf + library/cpp/yson + yql/essentials/ast + yql/essentials/core + yql/essentials/core/expr_nodes +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE( + transform +) diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.cpp b/yql/essentials/core/dq_integration/yql_dq_integration.cpp new file mode 100644 index 0000000000..ff1d0cfb84 --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_integration.cpp @@ -0,0 +1,24 @@ +#include "yql_dq_integration.h" + +#include <yql/essentials/core/yql_type_annotation.h> + +namespace NYql { + +std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx) { + std::unordered_set<IDqIntegration*> uniqueIntegrations(typesCtx.DataSources.size() + typesCtx.DataSinks.size()); + for (const auto& provider : typesCtx.DataSources) { + if (auto* dqIntegration = provider->GetDqIntegration()) { + uniqueIntegrations.emplace(dqIntegration); + } + } + + for (const auto& provider : typesCtx.DataSinks) { + if (auto* dqIntegration = provider->GetDqIntegration()) { + uniqueIntegrations.emplace(dqIntegration); + } + } + + return uniqueIntegrations; +} + +} // namespace NYql diff --git a/yql/essentials/core/dq_integration/yql_dq_integration.h b/yql/essentials/core/dq_integration/yql_dq_integration.h new file mode 100644 index 0000000000..4f08726cf2 --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_integration.h @@ -0,0 +1,84 @@ +#pragma once + +#include <yql/essentials/ast/yql_expr.h> +#include <yql/essentials/core/yql_data_provider.h> +#include <yql/essentials/core/yql_statistics.h> +#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h> +#include <yql/essentials/public/issue/yql_issue.h> + +#include <library/cpp/yson/writer.h> + +#include <util/generic/string.h> +#include <util/generic/vector.h> +#include <util/generic/map.h> +#include <util/generic/maybe.h> + +#include <google/protobuf/any.pb.h> + +namespace NJson { +class TJsonValue; +} // namespace NJson + +namespace NYql { + +struct TDqSettings; +class TTransformationPipeline; + +namespace NCommon { + class TMkqlCallableCompilerBase; +} + +class TFallbackError: public yexception { +public: + TFallbackError(TIssuePtr issue = {}) + : Issue_(std::move(issue)) + {} + + TIssuePtr GetIssue() const { + return Issue_; + } +private: + TIssuePtr Issue_; +}; + +class IDqIntegration { +public: + virtual ~IDqIntegration() {} + + virtual ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, + TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) = 0; + virtual bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues = false) = 0; + virtual bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues = true) = 0; + virtual TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) = 0; + virtual TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) = 0; + virtual TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) = 0; + virtual TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; + + // Nothing if callable is not for writing, + // false if callable is for writing and there are some errors (they are added to ctx), + // true if callable is for writing and no issues occured. + virtual TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) = 0; + + virtual TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) = 0; + virtual bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) = 0; + virtual void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) = 0; + virtual bool CanFallback() = 0; + virtual void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t maxPartitions, TExprContext& ctx) = 0; + virtual void FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) = 0; + virtual void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) = 0; + virtual void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) = 0; + virtual void Annotate(const TExprNode& node, THashMap<TString, TString>& params) = 0; + virtual bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) = 0; + virtual void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) = 0; + + // Fill plan operator properties for sources/sinks + // Return true if node was handled + virtual bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0; + virtual bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) = 0; + // Called to configure DQ peephole + virtual void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) = 0; +}; + +std::unordered_set<IDqIntegration*> GetUniqueIntegrations(const TTypeAnnotationContext& typesCtx); + +} // namespace NYql diff --git a/yql/essentials/core/dq_integration/yql_dq_optimization.cpp b/yql/essentials/core/dq_integration/yql_dq_optimization.cpp new file mode 100644 index 0000000000..b7f55b1f36 --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_optimization.cpp @@ -0,0 +1 @@ +#include "yql_dq_optimization.h" diff --git a/yql/essentials/core/dq_integration/yql_dq_optimization.h b/yql/essentials/core/dq_integration/yql_dq_optimization.h new file mode 100644 index 0000000000..80b39bb76a --- /dev/null +++ b/yql/essentials/core/dq_integration/yql_dq_optimization.h @@ -0,0 +1,86 @@ +#pragma once + +#include <yql/essentials/ast/yql_expr.h> + +namespace NYql { + +class IDqOptimization { +public: + virtual ~IDqOptimization() {} + + /** + Rewrite DqReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read, if any optimizations + */ + virtual TExprNode::TPtr RewriteRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + /** + Rewrite DqReadWrap's underlying provider specific read callable for lookup + Args: + * read - provider specific read callable + * ctx - expr context + Returns of of: + * empty TPtr, if lookup read is not supported + * DqLookupSourceWrap callable + */ + virtual TExprNode::TPtr RewriteLookupRead(const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + /** + Apply new members subset for DqReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * members - expr list of atoms with new members + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read with applyed new members + */ + virtual TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& members, TExprContext& ctx) = 0; + + /** + Apply `take` or `skip` setting for DqReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * countBase - `Take`, `Skip` or `Limit` callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read with applyed setting + */ + virtual TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& countBase, TExprContext& ctx) = 0; + + /** + Apply `unordered` setting for DqReadWrap's underlying provider specific read callable + Args: + * read - provider specific read callable + * ctx - expr context + Returns one of: + * empty TPtr on error + * original `read`, if no changes + * new read with applyed setting + */ + virtual TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& read, TExprContext& ctx) = 0; + + /** + Optimize list/stream extend for set of DqReadWrap's underlying provider specific read callable + Args: + * listOfRead - expr list of provider specific read callables + * ordered - `true` for ordered extend (must keep original order of reads); `false`, if reads may be reordred due the optimization + * ctx - expr context + Returns one of: + * empty list on error + * original `listOfRead`, if no changes + * new optimized list of reads. Returned list length must not be greater than original `listOfRead` length + */ + virtual TExprNode::TListType ApplyExtend(const TExprNode::TListType& listOfRead, bool ordered, TExprContext& ctx) = 0; +}; + +} // namespace NYql diff --git a/yql/essentials/core/ya.make b/yql/essentials/core/ya.make index 556e53a570..2e253709c2 100644 --- a/yql/essentials/core/ya.make +++ b/yql/essentials/core/ya.make @@ -98,6 +98,7 @@ END() RECURSE( cbo credentials + dq_integration file_storage issue minsketch diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h b/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h deleted file mode 100644 index a33679e4e5..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h +++ /dev/null @@ -1,65 +0,0 @@ -#pragma once - -#include <contrib/ydb/library/yql/utils/simd/simd.h> - -namespace NKikimr { -namespace NMiniKQL { -namespace NPackedTuple { - - - -template <typename TTraits> -inline ui32 CalculateCRC32(const ui8 * data, ui32 size, ui32 hash = 0 ) { - - using TSimdI8 = typename TTraits::TSimdI8; - - while (size >= 8) { - hash = TSimdI8::CRC32u64(hash, ReadUnaligned<ui64>(data)); - size -= 8; - data += 8; - } - - switch(size) { - case 7: - hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data)); - data += 4; - [[fallthrough]]; - case 3: - hash = TSimdI8::CRC32u16(hash, ReadUnaligned<ui16>(data)); - data += 2; - [[fallthrough]]; - case 1: - hash = TSimdI8::CRC32u8(hash, ReadUnaligned<ui8>(data)); - break; - case 6: - hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data)); - data += 4; - [[fallthrough]]; - case 2: - hash = TSimdI8::CRC32u16(hash, ReadUnaligned<ui16>(data)); - break; - case 5: - hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data)); - data += 4; - hash = TSimdI8::CRC32u8(hash, ReadUnaligned<ui8>(data)); - break; - case 4: - hash = TSimdI8::CRC32u32(hash, ReadUnaligned<ui32>(data)); - break; - case 0: - break; - } - return hash; - -} -template -__attribute__((target("avx2"))) -ui32 CalculateCRC32<NSimd::TSimdAVX2Traits>(const ui8 * data, ui32 size, ui32 hash = 0 ); -template -__attribute__((target("sse4.2"))) -ui32 CalculateCRC32<NSimd::TSimdSSE42Traits>(const ui8 * data, ui32 size, ui32 hash = 0 ); -} - -} - -} diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp b/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp deleted file mode 100644 index 25ac2a46c3..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/packed_tuple_ut.cpp +++ /dev/null @@ -1,899 +0,0 @@ -#include <yql/essentials/minikql/mkql_runtime_version.h> -#include <yql/essentials/minikql/comp_nodes/ut/mkql_computation_node_ut.h> -#include <library/cpp/testing/unittest/registar.h> - -#include <chrono> -#include <vector> -#include <set> -#include <random> - -#include <util/system/fs.h> -#include <util/system/compiler.h> -#include <util/stream/null.h> -#include <util/system/mem_info.h> - -#include <yql/essentials/minikql/comp_nodes/packed_tuple/hashes_calc.h> -#include <yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h> - -#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> - -namespace NKikimr { -namespace NMiniKQL { -namespace NPackedTuple { - -using namespace std::chrono_literals; - -static volatile bool IsVerbose = false; -#define CTEST (IsVerbose ? Cerr : Cnull) - -namespace { - -template <typename TTraits> -void TestCalculateCRC32_Impl() { - std::mt19937_64 rng; // fixed-seed (0) prng - std::vector<ui64> v(1024); - std::generate(v.begin(), v.end(), rng); - - ui64 nanoseconds = 0; - ui64 totalBytes = 0; - ui32 hash = 0; - for (ui32 test = 0; test < 65535; ++test) { - ui32 bytes = rng() % (sizeof(v[0])*v.size()); - - std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now(); - hash = CalculateCRC32<TTraits>((const ui8 *) v.data(), bytes, hash); - std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now(); - - nanoseconds += std::chrono::duration_cast<std::chrono::nanoseconds>(end01 - begin01).count(); - totalBytes += bytes; - } - CTEST << "Hash: " << hash << Endl; - UNIT_ASSERT_VALUES_EQUAL(hash, 80113928); - CTEST << "Data Size: " << totalBytes << Endl; - CTEST << "Time for hash: " << ((nanoseconds + 999)/1000) << "[microseconds]" << Endl; - CTEST << "Calculating speed: " << totalBytes / ((nanoseconds + 999)/1000) << "MB/sec" << Endl; -} -} - -Y_UNIT_TEST_SUITE(TestHash) { - -Y_UNIT_TEST(TestCalculateCRC32Fallback) { - TestCalculateCRC32_Impl<NSimd::TSimdFallbackTraits>(); -} - -Y_UNIT_TEST(TestCalculateCRC32SSE42) { - if (NX86::HaveSSE42()) - TestCalculateCRC32_Impl<NSimd::TSimdSSE42Traits>(); - else - CTEST << "Skipped SSE42 test\n"; -} - -Y_UNIT_TEST(TestCalculateCRC32AVX2) { - if (NX86::HaveAVX2()) - TestCalculateCRC32_Impl<NSimd::TSimdAVX2Traits>(); - else - CTEST << "Skipped AVX2 test\n"; -} - -} - -Y_UNIT_TEST_SUITE(TupleLayout) { -Y_UNIT_TEST(CreateLayout) { - - TColumnDesc kc1, kc2, pc1, pc2, pc3; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 8; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 4; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 16; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - pc3.Role = EColumnRole::Payload; - pc3.DataSize = 8; - - std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2, pc3}; - - auto tl = TTupleLayout::Create(columns); - UNIT_ASSERT(tl->TotalRowSize == 45); -} - -Y_UNIT_TEST(Pack) { - - TScopedAlloc alloc(__LOCATION__); - - TColumnDesc kc1, kc2, pc1, pc2; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 8; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 4; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2}; - - auto tl = TTupleLayout::Create(columns); - UNIT_ASSERT(tl->TotalRowSize == 29); - - const ui64 NTuples1 = 10e6; - - const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; - - std::vector<ui64> col1(NTuples1, 0); - std::vector<ui32> col2(NTuples1, 0); - std::vector<ui64> col3(NTuples1, 0); - std::vector<ui32> col4(NTuples1, 0); - - std::vector<ui8> res(Tuples1DataBytes + 64, 0); - - for (ui32 i = 0; i < NTuples1; ++i) { - col1[i] = i; - col2[i] = i; - col3[i] = i; - col4[i] = i; - } - - const ui8* cols[4]; - - cols[0] = (ui8*) col1.data(); - cols[1] = (ui8*) col2.data(); - cols[2] = (ui8*) col3.data(); - cols[3] = (ui8*) col4.data(); - - std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); - - std::vector<ui8> colValid1((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid2((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid3((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid4((NTuples1 + 7)/8, ~0); - const ui8 *colsValid[4] = { - colValid1.data(), - colValid2.data(), - colValid3.data(), - colValid4.data(), - }; - - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); - std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); - ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count(); - if (microseconds == 0) microseconds = 1; - - CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; - CTEST << "Data size = " << Tuples1DataBytes / (1024 * 1024) << "[MB]" << Endl; - CTEST << "Calculating speed = " << Tuples1DataBytes / microseconds << "MB/sec" << Endl; - CTEST << Endl; - - UNIT_ASSERT(true); - -} - -Y_UNIT_TEST(Unpack) { - - TScopedAlloc alloc(__LOCATION__); - - TColumnDesc kc1, kc2, pc1, pc2; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 8; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 4; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - std::vector<TColumnDesc> columns{kc1, kc2, pc1, pc2}; - - auto tl = TTupleLayout::Create(columns); - UNIT_ASSERT(tl->TotalRowSize == 29); - - const ui64 NTuples1 = 10e6; - - const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; - - std::vector<ui64> col1(NTuples1, 0); - std::vector<ui32> col2(NTuples1, 0); - std::vector<ui64> col3(NTuples1, 0); - std::vector<ui32> col4(NTuples1, 0); - - std::vector<ui8> res(Tuples1DataBytes + 64, 0); - - for (ui32 i = 0; i < NTuples1; ++i) { - col1[i] = i; - col2[i] = i; - col3[i] = i; - col4[i] = i; - } - - const ui8* cols[4]; - - cols[0] = (ui8*) col1.data(); - cols[1] = (ui8*) col2.data(); - cols[2] = (ui8*) col3.data(); - cols[3] = (ui8*) col4.data(); - - std::vector<ui8> colValid1((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid2((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid3((NTuples1 + 7)/8, ~0); - std::vector<ui8> colValid4((NTuples1 + 7)/8, ~0); - const ui8 *colsValid[4] = { - colValid1.data(), - colValid2.data(), - colValid3.data(), - colValid4.data(), - }; - - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); - - std::vector<ui64> col1_new(NTuples1, 0); - std::vector<ui32> col2_new(NTuples1, 0); - std::vector<ui64> col3_new(NTuples1, 0); - std::vector<ui32> col4_new(NTuples1, 0); - - ui8* cols_new[4]; - cols_new[0] = (ui8*) col1_new.data(); - cols_new[1] = (ui8*) col2_new.data(); - cols_new[2] = (ui8*) col3_new.data(); - cols_new[3] = (ui8*) col4_new.data(); - - std::vector<ui8> colValid1_new((NTuples1 + 7)/8, 0); - std::vector<ui8> colValid2_new((NTuples1 + 7)/8, 0); - std::vector<ui8> colValid3_new((NTuples1 + 7)/8, 0); - std::vector<ui8> colValid4_new((NTuples1 + 7)/8, 0); - - ui8 *colsValid_new[4] = { - colValid1_new.data(), - colValid2_new.data(), - colValid3_new.data(), - colValid4_new.data(), - }; - - std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); - tl->Unpack(cols_new, colsValid_new, res.data(), overflow, 0, NTuples1); - std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); - ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count(); - - if (microseconds == 0) microseconds = 1; - - CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; - CTEST << "Data size = " << Tuples1DataBytes / (1024 * 1024) << "[MB]" << Endl; - CTEST << "Calculating speed = " << Tuples1DataBytes / microseconds << "MB/sec" << Endl; - CTEST << Endl; - - UNIT_ASSERT(std::memcmp(col1.data(), col1_new.data(), sizeof(ui64) * col1.size()) == 0); - UNIT_ASSERT(std::memcmp(col2.data(), col2_new.data(), sizeof(ui32) * col2.size()) == 0); - UNIT_ASSERT(std::memcmp(col3.data(), col3_new.data(), sizeof(ui64) * col3.size()) == 0); - UNIT_ASSERT(std::memcmp(col4.data(), col4_new.data(), sizeof(ui32) * col4.size()) == 0); - - UNIT_ASSERT(std::memcmp(colValid1.data(), colValid1_new.data(), colValid1.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid2.data(), colValid2_new.data(), colValid2.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid3.data(), colValid3_new.data(), colValid3.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid4.data(), colValid4_new.data(), colValid4.size()) == 0); -} - -Y_UNIT_TEST(PackVarSize) { - - TScopedAlloc alloc(__LOCATION__); - - TColumnDesc kc1, kcv1, kcv2, kc2, pc1, pc2; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 8; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 4; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - kcv1.Role = EColumnRole::Key; - kcv1.DataSize = 8; - kcv1.SizeType = EColumnSizeType::Variable; - - kcv2.Role = EColumnRole::Key; - kcv2.DataSize = 16; - kcv2.SizeType = EColumnSizeType::Variable; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - std::vector<TColumnDesc> columns{kc1, kc2, kcv1, kcv2, pc1, pc2}; - - auto tl = TTupleLayout::Create(columns); - CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl; - UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 54); - - const ui64 NTuples1 = 3; - - const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; - - std::vector<ui64> col1(NTuples1, 0); - std::vector<ui32> col2(NTuples1, 0); - std::vector<ui64> col3(NTuples1, 0); - std::vector<ui32> col4(NTuples1, 0); - - std::vector<ui32> vcol1(1, 0); - - std::vector<ui8> vcol1data; - std::vector<ui32> vcol2(1, 0); - std::vector<ui8> vcol2data; - - std::vector<ui8> res(Tuples1DataBytes + 64, 0); - std::vector<TString> vcol1str { - "abc", - "ABCDEFGHIJKLMNO", - "ZYXWVUTSPR" - }; - std::vector<TString> vcol2str { - "ABC", - "abcdefghijklmno", - "zyxwvutspr" - }; - for (auto &&str: vcol1str) { - for (auto c: str) - vcol1data.push_back(c); - vcol1.push_back(vcol1data.size()); - } - UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1); - for (auto &&str: vcol2str) { - for (auto c: str) - vcol2data.push_back(c); - vcol2.push_back(vcol2data.size()); - } - UNIT_ASSERT_VALUES_EQUAL(vcol2.size(), NTuples1 + 1); - for (ui32 i = 0; i < NTuples1; ++i) { - col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1; - col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1; - col3[i] = (3ull<<(sizeof(col3[0])*8 - 4)) + i + 1; - col4[i] = (4ull<<(sizeof(col4[0])*8 - 4)) + i + 1; - } - - const ui8* cols[4 + 2*2]; - - cols[0] = (ui8*) col1.data(); - cols[1] = (ui8*) col2.data(); - cols[2] = (ui8*) vcol1.data(); - cols[3] = (ui8*) vcol1data.data(); - cols[4] = (ui8*) vcol2.data(); - cols[5] = (ui8*) vcol2data.data(); - cols[6] = (ui8*) col3.data(); - cols[7] = (ui8*) col4.data(); - - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - std::vector<ui8> colValid((NTuples1 + 7)/8, ~0); - const ui8 *colsValid[8] = { - colValid.data(), - colValid.data(), - colValid.data(), - nullptr, - colValid.data(), - nullptr, - colValid.data(), - colValid.data(), - }; - - std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); - tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); - std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); - ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count(); - - if (microseconds == 0) - microseconds = 1; - - CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; -#ifndef NDEBUG - CTEST << "Result size = " << Tuples1DataBytes << Endl; - CTEST << "Result = "; - for (ui32 i = 0; i < Tuples1DataBytes; ++i) - CTEST << int(res[i]) << ' '; - CTEST << Endl; - CTEST << "Overflow size = " << overflow.size() << Endl; - CTEST << "Overflow = "; - for (auto c: overflow) - CTEST << int(c) << ' '; - CTEST << Endl; -#endif - static const ui8 expected_data[54*3] = { - // row1 - - 0xe2,0x47,0x16,0x6c, // hash - 0x1, 0, 0, 0x20, // col1 - 0x1, 0, 0, 0, 0, 0, 0, 0x10, // col2 - 0x3, 0x61, 0x62, 0x63, 0, 0, 0, 0, 0, // vcol1 - 0x3, 0x41, 0x42, 0x43, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // vcol2 - 0x3f, //NULL bitmap - 0x1, 0, 0, 0x40, // col3 - 0x1, 0, 0, 0, 0, 0, 0, 0x30, // col4 - // row2 - 0xc2, 0x1c, 0x1b, 0xa8, // hash - 0x2, 0, 0, 0x20, // col1 - 0x2, 0, 0, 0, 0, 0, 0, 0x10, // col2 - 0xff, 0, 0, 0, 0, 0xf, 0, 0, 0, // vcol1 [overflow offset, overflow size] - 0xf, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, 0x6d, 0x6e, 0x6f, // vcol2 - 0x3f, // NULL bitmap - 0x2, 0, 0, 0x40, // col3 - 0x2, 0, 0, 0, 0, 0, 0, 0x30, // col4 - // row3 - 0xfa, 0x49, 0x5, 0xe9, // hash - 0x3, 0, 0, 0x20, // col1 - 0x3, 0, 0, 0, 0, 0, 0, 0x10, // col2 - 0xff, 0xf, 0, 0, 0, 0xa, 0, 0, 0, // vcol1 [overflow offset, overflow size] - 0xa, 0x7a, 0x79, 0x78, 0x77, 0x76, 0x75, 0x74, 0x73, 0x70, 0x72, 0, 0, 0, 0, 0, // vcol2 - 0x3f, // NULL bitmap - 0x3, 0, 0, 0x40, // col3 - 0x3, 0, 0, 0, 0, 0, 0, 0x30, // col4 - }; - static const ui8 expected_overflow[25] = { - 0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x4a, 0x4b, 0x4c, 0x4d, 0x4e, 0x4f, - 0x5a, 0x59, 0x58, 0x57, 0x56, 0x55, 0x54, 0x53, 0x50, 0x52, - }; - UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1); - UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow)); - for (ui32 i = 0; i < sizeof(expected_data); ++i) - UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]); - for (ui32 i = 0; i < sizeof(expected_overflow); ++i) - UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]); -} - -Y_UNIT_TEST(UnpackVarSize) { - - TScopedAlloc alloc(__LOCATION__); - - TColumnDesc kc1, kcv1, kcv2, kc2, pc1, pc2; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 8; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 4; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - kcv1.Role = EColumnRole::Key; - kcv1.DataSize = 8; - kcv1.SizeType = EColumnSizeType::Variable; - - kcv2.Role = EColumnRole::Key; - kcv2.DataSize = 16; - kcv2.SizeType = EColumnSizeType::Variable; - - pc1.Role = EColumnRole::Payload; - pc1.DataSize = 8; - - pc2.Role = EColumnRole::Payload; - pc2.DataSize = 4; - - std::vector<TColumnDesc> columns{kc1, kc2, kcv1, kcv2, pc1, pc2}; - - auto tl = TTupleLayout::Create(columns); - CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl; - UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 54); - - const ui64 NTuples1 = 3; - - const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; - - std::vector<ui64> col1(NTuples1, 0); - std::vector<ui32> col2(NTuples1, 0); - std::vector<ui64> col3(NTuples1, 0); - std::vector<ui32> col4(NTuples1, 0); - - std::vector<ui32> vcol1(1, 0); - std::vector<ui8> vcol1data; - std::vector<ui32> vcol2(1, 0); - std::vector<ui8> vcol2data; - - std::vector<ui8> res(Tuples1DataBytes + 64, 0); - std::vector<TString> vcol1str { - "abc", - "ABCDEFGHIJKLMNO", - "ZYXWVUTSPR" - }; - std::vector<TString> vcol2str { - "ABC", - "abcdefghijklmno", - "zyxwvutspr" - }; - for (auto &&str: vcol1str) { - for (auto c: str) - vcol1data.push_back(c); - vcol1.push_back(vcol1data.size()); - } - UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1); - for (auto &&str: vcol2str) { - for (auto c: str) - vcol2data.push_back(c); - vcol2.push_back(vcol2data.size()); - } - UNIT_ASSERT_VALUES_EQUAL(vcol2.size(), NTuples1 + 1); - for (ui32 i = 0; i < NTuples1; ++i) { - col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1; - col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1; - col3[i] = (3ull<<(sizeof(col3[0])*8 - 4)) + i + 1; - col4[i] = (4ull<<(sizeof(col4[0])*8 - 4)) + i + 1; - } - - const ui8* cols[4 + 2*2]; - - cols[0] = (ui8*) col1.data(); - cols[1] = (ui8*) col2.data(); - cols[2] = (ui8*) vcol1.data(); - cols[3] = (ui8*) vcol1data.data(); - cols[4] = (ui8*) vcol2.data(); - cols[5] = (ui8*) vcol2data.data(); - cols[6] = (ui8*) col3.data(); - cols[7] = (ui8*) col4.data(); - - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - std::vector<ui8> colValid((NTuples1 + 7)/8, ~0); - const ui8 *colsValid[8] = { - colValid.data(), - colValid.data(), - colValid.data(), - nullptr, - colValid.data(), - nullptr, - colValid.data(), - colValid.data(), - }; - - tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); - - std::vector<ui64> col1_new(NTuples1, 0); - std::vector<ui32> col2_new(NTuples1, 0); - std::vector<ui64> col3_new(NTuples1, 0); - std::vector<ui32> col4_new(NTuples1, 0); - - std::vector<ui32> vcol1_new(NTuples1 + 1, 0); - std::vector<ui8> vcol1data_new(vcol1data.size()); - std::vector<ui32> vcol2_new(NTuples1 + 1, 0); - std::vector<ui8> vcol2data_new(vcol2data.size()); - - ui8* cols_new[4 + 2 * 2]; - cols_new[0] = (ui8*) col1_new.data(); - cols_new[1] = (ui8*) col2_new.data(); - cols_new[2] = (ui8*) vcol1_new.data(); - cols_new[3] = (ui8*) vcol1data_new.data(); - cols_new[4] = (ui8*) vcol2_new.data(); - cols_new[5] = (ui8*) vcol2data_new.data(); - cols_new[6] = (ui8*) col3_new.data(); - cols_new[7] = (ui8*) col4_new.data(); - - std::vector<ui8> colValid1_new((NTuples1 + 7)/8, 0); - colValid1_new.back() = ~0; - std::vector<ui8> colValid2_new((NTuples1 + 7)/8, 0); - colValid2_new.back() = ~0; - std::vector<ui8> colValid3_new((NTuples1 + 7)/8, 0); - colValid3_new.back() = ~0; - std::vector<ui8> colValid4_new((NTuples1 + 7)/8, 0); - colValid4_new.back() = ~0; - std::vector<ui8> colValid5_new((NTuples1 + 7)/8, 0); - colValid5_new.back() = ~0; - std::vector<ui8> colValid6_new((NTuples1 + 7)/8, 0); - colValid6_new.back() = ~0; - - ui8 *colsValid_new[8] = { - colValid1_new.data(), - colValid2_new.data(), - colValid3_new.data(), - nullptr, - colValid4_new.data(), - nullptr, - colValid5_new.data(), - colValid6_new.data(), - }; - - std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); - tl->Unpack(cols_new, colsValid_new, res.data(), overflow, 0, NTuples1); - std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); - ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count(); - - if (microseconds == 0) - microseconds = 1; - - CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; -#ifndef NDEBUG - CTEST << "Result size = " << Tuples1DataBytes << Endl; - CTEST << "Result = "; - for (ui32 i = 0; i < Tuples1DataBytes; ++i) - CTEST << int(res[i]) << ' '; - CTEST << Endl; - CTEST << "Overflow size = " << overflow.size() << Endl; - CTEST << "Overflow = "; - for (auto c: overflow) - CTEST << int(c) << ' '; - CTEST << Endl; -#endif - - UNIT_ASSERT(std::memcmp(cols[0], cols_new[0], sizeof(ui64) * col1.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[1], cols_new[1], sizeof(ui32) * col2.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[2], cols_new[2], sizeof(ui32) * vcol1.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[3], cols_new[3], vcol1data.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[4], cols_new[4], sizeof(ui32) * vcol2.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[5], cols_new[5], vcol1data.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[6], cols_new[6], sizeof(ui64) * col3.size()) == 0); - UNIT_ASSERT(std::memcmp(cols[7], cols_new[7], sizeof(ui32) * col4.size()) == 0); - - UNIT_ASSERT(std::memcmp(colValid.data(), colValid1_new.data(), colValid.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid.data(), colValid2_new.data(), colValid.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid.data(), colValid3_new.data(), colValid.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid.data(), colValid4_new.data(), colValid.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid.data(), colValid5_new.data(), colValid.size()) == 0); - UNIT_ASSERT(std::memcmp(colValid.data(), colValid6_new.data(), colValid.size()) == 0); -} - -Y_UNIT_TEST(PackVarSizeBig) { - - TScopedAlloc alloc(__LOCATION__); - - TColumnDesc kc1, kc2, kcv1; - - kc1.Role = EColumnRole::Key; - kc1.DataSize = 1; - - kc2.Role = EColumnRole::Key; - kc2.DataSize = 2; - - kcv1.Role = EColumnRole::Key; - kcv1.DataSize = 1000; - kcv1.SizeType = EColumnSizeType::Variable; - - std::vector<TColumnDesc> columns{kc1, kc2, kcv1 }; - - auto tl = TTupleLayout::Create(columns); - //CTEST << "TotalRowSize = " << tl->TotalRowSize << Endl; - UNIT_ASSERT_VALUES_EQUAL(tl->TotalRowSize, 263); - - const ui64 NTuples1 = 2; - - const ui64 Tuples1DataBytes = (tl->TotalRowSize) * NTuples1; - - std::vector<ui8> col1(NTuples1, 0); - std::vector<ui16> col2(NTuples1, 0); - - std::vector<ui32> vcol1(1, 0); - - std::vector<ui8> vcol1data; - - std::vector<ui8> res(Tuples1DataBytes + 64, 0); - std::vector<TString> vcol1str { - "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb" - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbabcdefghijklnmorstuvwxy", - "zaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbb" - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" - "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbrstuv", - }; - for (auto &&str: vcol1str) { - for (auto c: str) - vcol1data.push_back(c); - vcol1.push_back(vcol1data.size()); - } - UNIT_ASSERT_VALUES_EQUAL(vcol1.size(), NTuples1 + 1); - for (ui32 i = 0; i < NTuples1; ++i) { - col1[i] = (1ull<<(sizeof(col1[0])*8 - 4)) + i + 1; - col2[i] = (2ull<<(sizeof(col2[0])*8 - 4)) + i + 1; - } - - const ui8* cols[2 + 1*2]; - - cols[0] = (ui8*) col1.data(); - cols[1] = (ui8*) col2.data(); - cols[2] = (ui8*) vcol1.data(); - cols[3] = (ui8*) vcol1data.data(); - - std::vector<ui8> colValid((NTuples1 + 7)/8, ~0); - const ui8 *colsValid[2 + 1*2] = { - colValid.data(), - colValid.data(), - colValid.data(), - nullptr, - }; - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - - std::chrono::steady_clock::time_point begin02 = std::chrono::steady_clock::now(); - tl->Pack(cols, colsValid, res.data(), overflow, 0, NTuples1); - std::chrono::steady_clock::time_point end02 = std::chrono::steady_clock::now(); - ui64 microseconds = std::chrono::duration_cast<std::chrono::microseconds>(end02 - begin02).count(); - - CTEST << "Time for " << (NTuples1) << " transpose (external cycle)= " << microseconds << "[microseconds]" << Endl; -#ifndef NDEBUG - CTEST << "Result size = " << Tuples1DataBytes << Endl; - CTEST << "Result = "; - for (ui32 i = 0; i < Tuples1DataBytes; ++i) - CTEST << int(res[i]) << ' '; - CTEST << Endl; - CTEST << "Overflow size = " << overflow.size() << Endl; - CTEST << "Overflow = "; - for (auto c: overflow) - CTEST << int(c) << ' '; - CTEST << Endl; -#endif - static const ui8 expected_data[263*2] = { - // row1 - 0xe1,0x22,0x63,0xf5, // hash - 0x11, // col1 - 0x1, 0x20, // col2 - 0xff, 0, 0, 0, 0, 0xb, 0, 0, 0, // vcol2 [ overflow offset, overflow size ] - 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x61, 0x62, 0x63, 0x64, 0x65, 0x66, - 0x67, 0x68, 0x69, 0x6a, 0x6b, 0x6c, - 0x7, // NULL bitmap - // row 2 - 0xab,0xa5,0x5f,0xd4, // hash - 0x12, // col1 - 0x2, 0x20, // col2 - 0xfe, 0x7a, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, - 0x61, 0x61, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, 0x62, - 0x62, 0x62, 0x72, 0x73, 0x74, 0x75, 0x76, - 0x7, // NULLs bitmap - }; - static const ui8 expected_overflow[11] = { - 0x6e, 0x6d, 0x6f, 0x72, 0x73, 0x74, 0x75, 0x76, 0x77, 0x78, 0x79, - }; - UNIT_ASSERT_VALUES_EQUAL(sizeof(expected_data), tl->TotalRowSize*NTuples1); - UNIT_ASSERT_VALUES_EQUAL(overflow.size(), sizeof(expected_overflow)); - for (ui32 i = 0; i < sizeof(expected_data); ++i) - UNIT_ASSERT_VALUES_EQUAL(expected_data[i], res[i]); - for (ui32 i = 0; i < sizeof(expected_overflow); ++i) - UNIT_ASSERT_VALUES_EQUAL(expected_overflow[i], overflow[i]); -} -Y_UNIT_TEST(PackIsValidFuzz) { - - TScopedAlloc alloc(__LOCATION__); - - std::mt19937 rng; // fixed-seed (0) prng - std::vector<TColumnDesc> columns; - std::vector<std::vector<ui8>> colsdata; - std::vector<const ui8*> colsptr; - std::vector<std::vector<ui8>> isValidData; - std::vector<const ui8*> isValidPtr; - - ui64 totalNanoseconds = 0; - ui64 totalSize = 0; - ui64 totalRows = 0; - for (ui32 test = 0; test < 10; ++test) { - ui32 rows = 1 + (rng() % 1000); - ui32 cols = 1 + (rng() % 100); - columns.resize(cols); - colsdata.resize(cols); - colsptr.resize(cols); - isValidData.resize(cols); - isValidPtr.resize(cols); - ui32 isValidSize = (rows + 7)/8; - totalRows += rows; - for (ui32 j = 0; j < cols; ++j) { - auto &col = columns[j]; - col.Role = (rng() % 10 < 1) ? EColumnRole::Key : EColumnRole::Payload; - col.DataSize = 1u <<(rng() % 16); - col.SizeType = EColumnSizeType::Fixed; - colsdata[j].resize(rows*col.DataSize); - colsptr[j] = colsdata[j].data(); - isValidData[j].resize(isValidSize); - isValidPtr[j] = isValidData[j].data(); - std::generate(isValidData[j].begin(), isValidData[j].end(), rng); - } - auto tl = TTupleLayout::Create(columns); - std::vector<ui8> res; - for (ui32 subtest = 0; subtest < 20; ++subtest) { - ui32 subRows = 1 + (rows ? rng() % (rows - 1) : 0); - ui32 off = subRows != rows ? rng() % (rows - subRows) : 0; - std::vector<ui8, TMKQLAllocator<ui8>> overflow; - totalSize += subRows*tl->TotalRowSize; - res.resize(subRows*tl->TotalRowSize); - - std::chrono::steady_clock::time_point begin01 = std::chrono::steady_clock::now(); - tl->Pack(colsptr.data(), isValidPtr.data(), res.data(), overflow, off, subRows); - std::chrono::steady_clock::time_point end01 = std::chrono::steady_clock::now(); - totalNanoseconds += std::chrono::duration_cast<std::chrono::nanoseconds>(end01 - begin01).count(); - - UNIT_ASSERT_VALUES_EQUAL(overflow.size(), 0); - auto resptr = res.data(); - for (ui32 row = 0; row < subRows; ++row, resptr += tl->TotalRowSize) { - for (ui32 j = 0; j < cols; ++j) { - auto &col = tl->Columns[j]; - UNIT_ASSERT_VALUES_EQUAL(((resptr[tl->BitmaskOffset + (j / 8)] >> (j % 8)) & 1), ((isValidData[col.OriginalIndex][(off + row) / 8] >> ((off + row) % 8)) & 1)); - } - } - } - } - - if (totalNanoseconds == 0) totalNanoseconds = 1; - - CTEST << "Time for " << totalRows << " transpose (external cycle)= " << (totalNanoseconds + 999)/1000 << "[microseconds]" << Endl; - CTEST << "Data size = " << totalSize / (1024 * 1024) << "[MB]" << Endl; - CTEST << "Calculating speed = " << totalSize / ((totalNanoseconds + 999)/1000) << "MB/sec" << Endl; - CTEST << Endl; -} -} - - - -} -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h b/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h deleted file mode 100644 index 929e11fea4..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/packing.h +++ /dev/null @@ -1,424 +0,0 @@ -#include <util/system/unaligned_mem.h> -#include <contrib/ydb/library/yql/utils/simd/simd.h> - -namespace NKikimr { -namespace NMiniKQL { -namespace NPackedTuple { - -static void -PackTupleFallbackRowImpl(const ui8 *const src_cols[], ui8 *const dst_rows, - const size_t cols, const size_t size, - const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t start = 0) { - for (size_t row = 0; row != size; ++row) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - *reinterpret_cast<ui##bits *>(dst_rows + row * tuple_size + \ - offsets[col]) = \ - *reinterpret_cast<const ui##bits *>(src_cols[col] + \ - (start + row) * (bits / 8)); \ - break - - MULTY_8x4(CASE); - -#undef CASE -#undef MULTY_8x4 - - default: - memcpy(dst_rows + row * tuple_size + offsets[col], - src_cols[col] + (start + row) * col_sizes[col], - col_sizes[col]); - } - } - } -} - -static void -UnpackTupleFallbackRowImpl(const ui8 *const src_rows, ui8 *const dst_cols[], - const size_t cols, const size_t size, - const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t start = 0) { - for (size_t row = 0; row != size; ++row) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - *reinterpret_cast<ui##bits *>(dst_cols[col] + \ - (start + row) * (bits / 8)) = \ - *reinterpret_cast<const ui##bits *>(src_rows + row * tuple_size + \ - offsets[col]); \ - break - - MULTY_8x4(CASE); - -#undef CASE -#undef MULTY_8x4 - - default: - memcpy(dst_cols[col] + (start + row) * col_sizes[col], - src_rows + row * tuple_size + offsets[col], - col_sizes[col]); - } - } - } -} - -template <class ByteType> -Y_FORCE_INLINE static void -PackTupleFallbackTypedColImpl(const ui8 *const src_col, ui8 *const dst_rows, - const size_t size, const size_t tuple_size, - const size_t start = 0) { - static constexpr size_t BYTES = sizeof(ByteType); - for (size_t row = 0; row != size; ++row) { - WriteUnaligned<ByteType>( - dst_rows + row * tuple_size, - ReadUnaligned<ByteType>(src_col + (start + row) * BYTES)); - } -} - -template <class ByteType> -Y_FORCE_INLINE static void -UnpackTupleFallbackTypedColImpl(const ui8 *const src_rows, ui8 *const dst_col, - const size_t size, const size_t tuple_size, - const size_t start = 0) { - static constexpr size_t BYTES = sizeof(ByteType); - for (size_t row = 0; row != size; ++row) { - WriteUnaligned<ByteType>( - dst_col + (start + row) * BYTES, - ReadUnaligned<ByteType>(src_rows + row * tuple_size)); - } -} - -static void -PackTupleFallbackColImpl(const ui8 *const src_cols[], ui8 *const dst_rows, - const size_t cols, const size_t size, - const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t start = 0) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - PackTupleFallbackTypedColImpl<ui##bits>( \ - src_cols[col], dst_rows + offsets[col], size, tuple_size, start); \ - break - - MULTY_8x4(CASE); - -#undef CASE -#undef MULTY_8x4 - - default: - for (size_t row = 0; row != size; ++row) { - memcpy(dst_rows + row * tuple_size + offsets[col], - src_cols[col] + (start + row) * col_sizes[col], - col_sizes[col]); - } - } - } -} - -static void -UnpackTupleFallbackColImpl(const ui8 *const src_rows, ui8 *const dst_cols[], - const size_t cols, const size_t size, - const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t start = 0) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - UnpackTupleFallbackTypedColImpl<ui##bits>( \ - src_rows + offsets[col], dst_cols[col], size, tuple_size, start); \ - break - - MULTY_8x4(CASE); - -#undef CASE -#undef MULTY_8x4 - - default: - for (size_t row = 0; row != size; ++row) { - memcpy(dst_cols[col] + (start + row) * col_sizes[col], - src_rows + row * tuple_size + offsets[col], - col_sizes[col]); - } - } - } -} - -[[maybe_unused]] static void PackTupleFallbackBlockImpl( - const ui8 *const src_cols[], ui8 *const dst_rows, const size_t cols, - const size_t size, const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t block_rows, const size_t start = 0) { - - const size_t block_size = size / block_rows; - for (size_t block = 0; block != block_size; ++block) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define BLOCK_LOOP(...) \ - for (size_t block_i = 0; block_i != block_rows; ++block_i) { \ - const size_t row = block_rows * block + block_i; \ - __VA_ARGS__ \ - } - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - PackTupleFallbackTypedColImpl<ui##bits>( \ - src_cols[col], \ - dst_rows + block * block_rows * tuple_size + offsets[col], \ - block_rows, tuple_size, start + block * block_rows); \ - break - - MULTY_8x4(CASE); - - default: - BLOCK_LOOP( - memcpy(dst_rows + row * tuple_size + offsets[col], - src_cols[col] + (start + row) * col_sizes[col], - col_sizes[col]);) - -#undef CASE -#undef MULTY_8x4 -#undef BLOCK_LOOP - } - } - } - - PackTupleFallbackColImpl( - src_cols, dst_rows + block_size * block_rows * tuple_size, cols, - size - block_size * block_rows, col_sizes, offsets, tuple_size, - start + block_size * block_rows); -} - -[[maybe_unused]] static void UnpackTupleFallbackBlockImpl( - const ui8 *const src_rows, ui8 *const dst_cols[], const size_t cols, - const size_t size, const size_t col_sizes[], const size_t offsets[], - const size_t tuple_size, const size_t block_rows, const size_t start = 0) { - - const size_t block_size = size / block_rows; - for (size_t block = 0; block != block_size; ++block) { - for (ui8 col = 0; col != cols; ++col) { - switch (col_sizes[col] * 8) { - -#define BLOCK_LOOP(...) \ - for (size_t block_i = 0; block_i != block_rows; ++block_i) { \ - const size_t row = block_rows * block + block_i; \ - __VA_ARGS__ \ - } - -#define MULTY_8x4(...) \ - __VA_ARGS__(8); \ - __VA_ARGS__(16); \ - __VA_ARGS__(32); \ - __VA_ARGS__(64) - -#define CASE(bits) \ - case bits: \ - UnpackTupleFallbackTypedColImpl<ui##bits>( \ - src_rows + block * block_rows * tuple_size + offsets[col], \ - dst_cols[col], block_rows, tuple_size, \ - start + block * block_rows); \ - break - - MULTY_8x4(CASE); - - default: - BLOCK_LOOP( - memcpy(dst_cols[col] + (start + row) * col_sizes[col], - src_rows + row * tuple_size + offsets[col], - col_sizes[col]);) - -#undef CASE -#undef MULTY_8x4 -#undef BLOCK_LOOP - } - } - } - - UnpackTupleFallbackColImpl(src_rows + block_size * block_rows * tuple_size, - dst_cols, cols, size - block_size * block_rows, - col_sizes, offsets, tuple_size, - start + block_size * block_rows); -} - -template <class TTraits> struct SIMDPack { - template <class T> using TSimd = typename TTraits::template TSimd8<T>; - - static TSimd<ui8> BuildTuplePerm(size_t col_size, size_t col_pad, - ui8 offset, ui8 ind, bool packing) { - ui8 perm[TSimd<ui8>::SIZE]; - std::memset(perm, 0x80, TSimd<ui8>::SIZE); - - size_t iters = std::max(size_t(1u), TSimd<ui8>::SIZE / (col_size + col_pad)); - while (iters--) { - for (size_t it = col_size; it; --it, ++offset, ++ind) { - if (packing) { - perm[offset] = ind; - } else { - perm[ind] = offset; - } - } - offset += col_pad; - } - - return TSimd<ui8>{perm}; - } - - template <ui8 TupleSize> static TSimd<ui8> TupleOr(TSimd<ui8> vec[]) { - return TupleOrImpl<TupleSize>(vec); - } - - template <ui8 TupleSize> static TSimd<ui8> TupleOrImpl(TSimd<ui8> vec[]) { - static constexpr ui8 Left = TupleSize / 2; - static constexpr ui8 Right = TupleSize - Left; - - return TupleOrImpl<Left>(vec) | TupleOrImpl<Right>(vec + Left); - } - - template <> TSimd<ui8> TupleOrImpl<0>(TSimd<ui8>[]) { std::abort(); } - - template <> TSimd<ui8> TupleOrImpl<1>(TSimd<ui8> vec[]) { return vec[0]; } - - template <> TSimd<ui8> TupleOrImpl<2>(TSimd<ui8> vec[]) { - return vec[0] | vec[1]; - } - - template <ui8 StoresPerLoad, ui8 Cols> - static void - PackTupleOrImpl(const ui8 *const src_cols[], ui8 *const dst_rows, - const size_t size, const size_t col_sizes[], - const size_t offsets[], const size_t tuple_size, - const TSimd<ui8> perms[], const size_t start = 0) { - static constexpr size_t kSIMD_Rem = sizeof(TSimd<ui8>) - StoresPerLoad; - const ui8 tuples_per_store = - std::max(size_t(1u), TSimd<ui8>::SIZE / tuple_size); - const size_t simd_iters = (size > kSIMD_Rem ? size - kSIMD_Rem : 0) / - (tuples_per_store * StoresPerLoad); - - TSimd<ui8> src_regs[Cols]; - TSimd<ui8> perm_regs[Cols]; - - const ui8 *srcs[Cols]; - std::memcpy(srcs, src_cols, sizeof(srcs)); - for (ui8 col = 0; col != Cols; ++col) { - srcs[col] += col_sizes[col] * start; - } - - auto dst = dst_rows; - ui8 *const end = dst_rows + simd_iters * tuples_per_store * - StoresPerLoad * tuple_size; - while (dst != end) { - for (ui8 col = 0; col != Cols; ++col) { - src_regs[col] = TSimd<ui8>(srcs[col]); - srcs[col] += col_sizes[col] * tuples_per_store * StoresPerLoad; - } - - for (ui8 iter = 0; iter != StoresPerLoad; ++iter) { - // shuffling each col bytes to the right positions - // then blending them together with 'or' - for (ui8 col = 0; col != Cols; ++col) { - perm_regs[col] = src_regs[col].Shuffle( - perms[col * StoresPerLoad + iter]); - } - - TupleOr<Cols>(perm_regs).Store(dst); - dst += tuple_size * tuples_per_store; - } - } - - PackTupleFallbackRowImpl(srcs, dst, Cols, - size - simd_iters * tuples_per_store * - StoresPerLoad, - col_sizes, offsets, tuple_size); - } - - template <ui8 LoadsPerStore, ui8 Cols> - static void - UnpackTupleOrImpl(const ui8 *const src_rows, ui8 *const dst_cols[], - size_t size, const size_t col_sizes[], - const size_t offsets[], const size_t tuple_size, - const TSimd<ui8> perms[], const size_t start = 0) { - static constexpr size_t kSIMD_Rem = sizeof(TSimd<ui8>) - LoadsPerStore; - const ui8 tuples_per_load = - std::max(size_t(1u), TSimd<ui8>::SIZE / tuple_size); - const size_t simd_iters = (size > kSIMD_Rem ? size - kSIMD_Rem : 0) / - (tuples_per_load * LoadsPerStore); - - TSimd<ui8> src_regs[LoadsPerStore]; - TSimd<ui8> perm_regs[LoadsPerStore]; - - auto src = src_rows; - const ui8 *const end = src_rows + simd_iters * tuples_per_load * - LoadsPerStore * tuple_size; - - ui8 *dsts[Cols]; - std::memcpy(dsts, dst_cols, sizeof(dsts)); - for (ui8 col = 0; col != Cols; ++col) { - dsts[col] += col_sizes[col] * start; - } - - while (src != end) { - for (ui8 iter = 0; iter != LoadsPerStore; ++iter) { - src_regs[iter] = TSimd<ui8>(src); - src += tuple_size * tuples_per_load; - } - - for (ui8 col = 0; col != Cols; ++col) { - // shuffling each col bytes to the right positions - // then blending them together with 'or' - for (ui8 iter = 0; iter != LoadsPerStore; ++iter) { - perm_regs[iter] = src_regs[iter].Shuffle( - perms[col * LoadsPerStore + iter]); - } - - TupleOr<LoadsPerStore>(perm_regs).Store(dsts[col]); - dsts[col] += col_sizes[col] * tuples_per_load * LoadsPerStore; - } - } - - UnpackTupleFallbackRowImpl(src, dsts, Cols, - size - simd_iters * tuples_per_load * - LoadsPerStore, - col_sizes, offsets, tuple_size); - } -}; - -} // namespace NPackedTuple -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp b/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp deleted file mode 100644 index 8a51b8c123..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.cpp +++ /dev/null @@ -1,983 +0,0 @@ -#include "tuple.h" - -#include <algorithm> -#include <queue> - -#include <yql/essentials/minikql/mkql_node.h> -#include <yql/essentials/public/udf/udf_data_type.h> -#include <yql/essentials/public/udf/udf_types.h> -#include <yql/essentials/public/udf/udf_value.h> - -#include <util/generic/bitops.h> -#include <util/generic/buffer.h> - -#include "hashes_calc.h" -#include "packing.h" - -namespace NKikimr { -namespace NMiniKQL { -namespace NPackedTuple { - -namespace { - -// Transpose 8x8 bit-matrix packed in ui64 integer -Y_FORCE_INLINE ui64 transposeBitmatrix(ui64 x) { - if (x == 0xFFFFFFFFFFFFFFFFLL) { - return x; - } - - // a b A B aa bb AA BB - // c d C D cc dd CC DD - // -> - // a c A C aa cc AA CC - // b d B D bb dd BB DD - // a b A B aa bb AA BB // c d C D cc dd CC DD - // a c A C aa cc AA CC // b d B D bb dd BB DD - x = ((x & - 0b10101010'01010101'10101010'01010101'10101010'01010101'10101010'01010101ull)) | - ((x & - 0b01010101'00000000'01010101'00000000'01010101'00000000'01010101'00000000ull) >> - 7) | - ((x & - 0b00000000'10101010'00000000'10101010'00000000'10101010'00000000'10101010ull) - << 7); - // a1 a2 b1 b2 A1 A2 B1 B2 - // a3 a4 b3 b4 A3 A4 B3 B4 - // c1 c2 d1 d2 C1 C2 D1 D2 - // c3 c4 d3 d4 C3 C4 D3 D4 - // -> - // a1 a2 c1 c2 A1 A2 C1 C2 - // a3 a4 c3 c4 A3 A4 C3 C4 - // b1 b2 d1 d2 B1 B2 D1 D2 - // b3 b4 d3 d4 B3 B4 D3 D4 - // - // - // a1 a2 b1 b2 A1 A2 B1 B2 // a3 a4 b3 b4 A3 A4 B3 B4 // c1 c2 d1 d2 C1 C2 - // D1 D2 // c3 c4 d3 d4 C3 C4 D3 D4 - // -> - // a1 a2 c1 c2 A1 A2 C1 C2 // a3 a4 c3 c4 A3 A4 C3 C4 // b1 b2 d1 d2 B1 B2 - // D1 D2 // b3 b4 d3 d4 B3 B4 D3 D4 - x = ((x & - 0b1100110011001100'0011001100110011'1100110011001100'0011001100110011ull)) | - ((x & - 0b0011001100110011'0000000000000000'0011001100110011'0000000000000000ull) >> - 14) | - ((x & - 0b0000000000000000'1100110011001100'0000000000000000'1100110011001100ull) - << 14); - x = ((x & - 0b11110000111100001111000011110000'00001111000011110000111100001111ull)) | - ((x & - 0b00001111000011110000111100001111'00000000000000000000000000000000ull) >> - 28) | - ((x & - 0b00000000000000000000000000000000'11110000111100001111000011110000ull) - << 28); - return x; -} - -void transposeBitmatrix(ui8 dst[], const ui8 *src[], const size_t row_size) { - ui64 x = 0; - for (size_t ind = 0; ind != 8; ++ind) { - x |= ui64(*src[ind]) << (ind * 8); - } - - x = transposeBitmatrix(x); - - for (size_t ind = 0; ind != 8; ++ind) { - dst[ind * row_size] = x; - x >>= 8; - } -} - -void transposeBitmatrix(ui8 *dst[], const ui8 src[], const size_t row_size) { - ui64 x = 0; - for (size_t ind = 0; ind != 8; ++ind) { - x |= ui64(src[ind * row_size]) << (ind * 8); - } - - x = transposeBitmatrix(x); - - for (size_t ind = 0; ind != 8; ++ind) { - *dst[ind] = x; - x >>= 8; - } -} - -} // namespace - -THolder<TTupleLayout> -TTupleLayout::Create(const std::vector<TColumnDesc> &columns) { - - if (NX86::HaveAVX2()) - return MakeHolder<TTupleLayoutFallback<NSimd::TSimdAVX2Traits>>( - columns); - - if (NX86::HaveSSE42()) - return MakeHolder<TTupleLayoutFallback<NSimd::TSimdSSE42Traits>>( - columns); - - return MakeHolder<TTupleLayoutFallback<NSimd::TSimdFallbackTraits>>( - columns); -} - -template <typename TTraits> -TTupleLayoutFallback<TTraits>::TTupleLayoutFallback( - const std::vector<TColumnDesc> &columns) - : TTupleLayout(columns) { - - for (ui32 i = 0, idx = 0; i < OrigColumns.size(); ++i) { - auto &col = OrigColumns[i]; - - col.OriginalIndex = idx; - - if (col.SizeType == EColumnSizeType::Variable) { - // we cannot handle (rare) overflow strings unless we have at least - // space for header; size of inlined strings is limited to 254 - // bytes, limit maximum inline data size - col.DataSize = std::max<ui32>(1 + 2 * sizeof(ui32), - std::min<ui32>(255, col.DataSize)); - idx += 2; // Variable-size takes two columns: one for offsets, and - // another for payload - } else { - idx += 1; - } - - if (col.Role == EColumnRole::Key) { - KeyColumns.push_back(col); - } else { - PayloadColumns.push_back(col); - } - } - - KeyColumnsNum = KeyColumns.size(); - - auto ColumnDescLess = [](const TColumnDesc &a, const TColumnDesc &b) { - if (a.SizeType != b.SizeType) // Fixed first - return a.SizeType == EColumnSizeType::Fixed; - - if (a.DataSize == b.DataSize) - // relative order of (otherwise) same key columns must be preserved - return a.OriginalIndex < b.OriginalIndex; - - return a.DataSize < b.DataSize; - }; - - std::sort(KeyColumns.begin(), KeyColumns.end(), ColumnDescLess); - std::sort(PayloadColumns.begin(), PayloadColumns.end(), ColumnDescLess); - - KeyColumnsFixedEnd = 0; - - ui32 currOffset = 4; // crc32 hash in the beginning - KeyColumnsOffset = currOffset; - KeyColumnsFixedNum = KeyColumnsNum; - - for (ui32 i = 0; i < KeyColumnsNum; ++i) { - auto &col = KeyColumns[i]; - - if (col.SizeType == EColumnSizeType::Variable && - KeyColumnsFixedEnd == 0) { - KeyColumnsFixedEnd = currOffset; - KeyColumnsFixedNum = i; - } - - col.ColumnIndex = i; - col.Offset = currOffset; - Columns.push_back(col); - currOffset += col.DataSize; - } - - KeyColumnsEnd = currOffset; - - if (KeyColumnsFixedEnd == 0) // >= 4 if was ever assigned - KeyColumnsFixedEnd = KeyColumnsEnd; - - KeyColumnsSize = KeyColumnsEnd - KeyColumnsOffset; - BitmaskOffset = currOffset; - - BitmaskSize = (OrigColumns.size() + 7) / 8; - - currOffset += BitmaskSize; - BitmaskEnd = currOffset; - - PayloadOffset = currOffset; - - for (ui32 i = 0; i < PayloadColumns.size(); ++i) { - auto &col = PayloadColumns[i]; - col.ColumnIndex = KeyColumnsNum + i; - col.Offset = currOffset; - Columns.push_back(col); - currOffset += col.DataSize; - } - - PayloadEnd = currOffset; - PayloadSize = PayloadEnd - PayloadOffset; - - TotalRowSize = currOffset; - - for (auto &col : Columns) { - if (col.SizeType == EColumnSizeType::Variable) { - VariableColumns_.push_back(col); - } else if (IsPowerOf2(col.DataSize) && - col.DataSize < (1u << FixedPOTColumns_.size())) { - FixedPOTColumns_[CountTrailingZeroBits(col.DataSize)].push_back( - col); - } else { - FixedNPOTColumns_.push_back(col); - } - } - - /// TODO: dynamic configuration - BlockRows_ = 256; - const bool use_simd = true; - - std::vector<const TColumnDesc *> block_fallback; - std::queue<const TColumnDesc *> next_cols; - - size_t fixed_cols_left = - KeyColumnsFixedNum + - std::accumulate(PayloadColumns.begin(), PayloadColumns.end(), 0ul, - [](size_t prev, const auto &col) { - return prev + - (col.SizeType == EColumnSizeType::Fixed); - }); - - size_t prev_tuple_size; - size_t curr_tuple_size = 0; - - const auto manage_block_packing = [&](const std::vector<TColumnDesc> - &columns) { - for (size_t col_ind = 0; - col_ind != columns.size() && - columns[col_ind].SizeType == EColumnSizeType::Fixed;) { - --fixed_cols_left; - next_cols.push(&columns[col_ind]); - prev_tuple_size = curr_tuple_size; - curr_tuple_size = next_cols.back()->Offset + - next_cols.back()->DataSize - - next_cols.front()->Offset; - - ++col_ind; - if (curr_tuple_size >= TSimd<ui8>::SIZE || - next_cols.size() == kSIMDMaxCols || !fixed_cols_left) { - const bool oversize = curr_tuple_size > TSimd<ui8>::SIZE; - const size_t tuple_size = - oversize ? prev_tuple_size : curr_tuple_size; - const size_t tuple_cols = next_cols.size() - oversize; - - if (!use_simd || !tuple_cols || - (Columns.size() != next_cols.size() && - tuple_size < TSimd<ui8>::SIZE * 7 / 8) || - tuple_size > TSimd<ui8>::SIZE || - (!SIMDBlock_.empty() && - TotalRowSize - next_cols.front()->Offset < - TSimd<ui8>::SIZE)) { - block_fallback.push_back(next_cols.front()); - next_cols.pop(); - continue; - } - - SIMDDesc simd_desc; - simd_desc.Cols = tuple_cols; - simd_desc.PermMaskOffset = SIMDPermMasks_.size(); - simd_desc.RowOffset = next_cols.front()->Offset; - - const TColumnDesc *col_descs[kSIMDMaxCols]; - ui32 col_max_size = 0; - for (ui8 col_ind = 0; col_ind != simd_desc.Cols; ++col_ind) { - col_descs[col_ind] = next_cols.front(); - col_max_size = - std::max(col_max_size, col_descs[col_ind]->DataSize); - next_cols.pop(); - } - - simd_desc.InnerLoopIters = std::min( - size_t(kSIMDMaxInnerLoopSize), - (TSimd<ui8>::SIZE / col_max_size) / - std::max(size_t(1u), size_t(TSimd<ui8>::SIZE / TotalRowSize))); - - const auto tuples_per_register = - std::max(1u, TSimd<ui8>::SIZE / TotalRowSize); - - for (ui8 col_ind = 0; col_ind != simd_desc.Cols; ++col_ind) { - const auto &col_desc = col_descs[col_ind]; - const size_t offset = - col_desc->Offset - simd_desc.RowOffset; - - BlockFixedColsSizes_.push_back(col_desc->DataSize); - BlockColsOffsets_.push_back(offset); - BlockColumnsOrigInds_.push_back(col_desc->OriginalIndex); - } - - for (size_t packing_flag = 1; packing_flag != 3; - ++packing_flag) { - for (ui8 col_ind = 0; col_ind != simd_desc.Cols; - ++col_ind) { - const auto &col_desc = col_descs[col_ind]; - const size_t offset = - col_desc->Offset - simd_desc.RowOffset; - - for (ui8 ind = 0; ind != simd_desc.InnerLoopIters; - ++ind) { - SIMDPermMasks_.push_back( - SIMDPack<TTraits>::BuildTuplePerm( - col_desc->DataSize, - TotalRowSize - col_desc->DataSize, offset, - ind * col_desc->DataSize * - tuples_per_register, - packing_flag % 2)); - } - } - } - - SIMDBlock_.push_back(simd_desc); - } - } - - while (!next_cols.empty()) { - block_fallback.push_back(next_cols.front()); - next_cols.pop(); - } - }; - - manage_block_packing(KeyColumns); - manage_block_packing(PayloadColumns); - - for (const auto col_desc_p : block_fallback) { - BlockColsOffsets_.push_back(col_desc_p->Offset); - BlockFixedColsSizes_.push_back(col_desc_p->DataSize); - BlockColumnsOrigInds_.push_back(col_desc_p->OriginalIndex); - } -} - -// Columns (SoA) format: -// for fixed size: packed data -// for variable size: offset (ui32) into next column; size of colum is -// rowCount + 1 -// -// Row (AoS) format: -// fixed size: packed data -// variable size: -// assumes DataSize <= 255 && DataSize >= 1 + 2*4 -// if size of payload is less than col.DataSize: -// u8 one byte of size (0..254) -// u8 [size] data -// u8 [DataSize - 1 - size] padding -// if size of payload is greater than DataSize: -// u8 = 255 -// u32 = offset in overflow buffer -// u32 = size -// u8 [DataSize - 1 - 2*4] initial bytes of data -// Data is expected to be consistent with isValidBitmask (0 for fixed-size, -// empty for variable-size) -template <> -void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Pack( - const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const { - using TTraits = NSimd::TSimdFallbackTraits; - - std::vector<ui64> bitmaskMatrix(BitmaskSize); - - if (auto off = (start % 8)) { - auto bitmaskIdx = start / 8; - - for (ui32 j = Columns.size(); j--;) - bitmaskMatrix[j / 8] |= - ui64(isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx]) - << ((j % 8) * 8); - - for (auto &m : bitmaskMatrix) { - m = transposeBitmatrix(m); - m >>= off * 8; - } - } - - for (; count--; ++start, res += TotalRowSize) { - ui32 hash = 0; - auto bitmaskIdx = start / 8; - - bool anyOverflow = false; - - for (ui32 i = KeyColumnsFixedNum; i < KeyColumns.size(); ++i) { - auto &col = KeyColumns[i]; - ui32 dataOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * start); - ui32 nextOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * (start + 1)); - auto size = nextOffset - dataOffset; - - if (size >= col.DataSize) { - anyOverflow = true; - break; - } - } - - if ((start % 8) == 0) { - std::fill(bitmaskMatrix.begin(), bitmaskMatrix.end(), 0); - for (ui32 j = Columns.size(); j--;) - bitmaskMatrix[j / 8] |= - ui64(isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx]) - << ((j % 8) * 8); - for (auto &m : bitmaskMatrix) - m = transposeBitmatrix(m); - } - - for (ui32 j = 0; j < BitmaskSize; ++j) { - res[BitmaskOffset + j] = ui8(bitmaskMatrix[j]); - bitmaskMatrix[j] >>= 8; - } - - for (auto &col : FixedNPOTColumns_) { - std::memcpy(res + col.Offset, - columns[col.OriginalIndex] + start * col.DataSize, - col.DataSize); - } - -#define PackPOTColumn(POT) \ - for (auto &col : FixedPOTColumns_[POT]) { \ - std::memcpy(res + col.Offset, \ - columns[col.OriginalIndex] + start * (1u << POT), \ - 1u << POT); \ - } - - PackPOTColumn(0); - PackPOTColumn(1); - PackPOTColumn(2); - PackPOTColumn(3); - PackPOTColumn(4); -#undef PackPOTColumn - - for (auto &col : VariableColumns_) { - auto dataOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * start); - auto nextOffset = ReadUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * (start + 1)); - auto size = nextOffset - dataOffset; - auto data = columns[col.OriginalIndex + 1] + dataOffset; - - if (size >= col.DataSize) { - res[col.Offset] = 255; - - ui32 prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32)); - auto overflowSize = size - prefixSize; - auto overflowOffset = overflow.size(); - - overflow.resize(overflowOffset + overflowSize); - - WriteUnaligned<ui32>(res + col.Offset + 1 + 0 * sizeof(ui32), - overflowOffset); - WriteUnaligned<ui32>(res + col.Offset + 1 + 1 * sizeof(ui32), - overflowSize); - std::memcpy(res + col.Offset + 1 + 2 * sizeof(ui32), data, - prefixSize); - std::memcpy(overflow.data() + overflowOffset, data + prefixSize, - overflowSize); - } else { - Y_DEBUG_ABORT_UNLESS(size < 255); - res[col.Offset] = size; - std::memcpy(res + col.Offset + 1, data, size); - std::memset(res + col.Offset + 1 + size, 0, - col.DataSize - (size + 1)); - } - - if (anyOverflow && col.Role == EColumnRole::Key) { - hash = - CalculateCRC32<TTraits>((ui8 *)&size, sizeof(ui32), hash); - hash = CalculateCRC32<TTraits>(data, size, hash); - } - } - - // isValid bitmap is NOT included into hashed data - if (anyOverflow) { - hash = CalculateCRC32<TTraits>( - res + KeyColumnsOffset, KeyColumnsFixedEnd - KeyColumnsOffset, - hash); - } else { - hash = CalculateCRC32<TTraits>(res + KeyColumnsOffset, - KeyColumnsEnd - KeyColumnsOffset); - } - WriteUnaligned<ui32>(res, hash); - } -} - -template <> -void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Unpack( - ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const { - std::vector<ui64> bitmaskMatrix(BitmaskSize, 0); - - { - const auto bitmaskIdx = start / 8; - const auto bitmaskShift = start % 8; - const auto bitmaskIdxC = (start + count) / 8; - const auto bitmaskShiftC = (start + count) % 8; - - /// ready first bitmatrix bytes - for (ui32 j = Columns.size(); j--;) - bitmaskMatrix[j / 8] |= - (isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx] & - ~(0xFF << bitmaskShift)) - << ((j % 8) * 8); - - /// ready last (which are same as above) bitmatrix bytes if needed - if (bitmaskIdx == bitmaskIdxC) - for (ui32 j = Columns.size(); j--;) - bitmaskMatrix[j / 8] |= - (isValidBitmask[Columns[j].OriginalIndex][bitmaskIdxC] & - (0xFF << bitmaskShiftC)) - << ((j % 8) * 8); - - for (auto &m : bitmaskMatrix) - m = transposeBitmatrix(m); - } - - for (auto ind = 0; ind != start % 8; ++ind) { - for (ui32 j = 0; j < BitmaskSize; ++j) { - bitmaskMatrix[j] |= - ui64( - (res - (start % 8 - ind) * TotalRowSize)[BitmaskOffset + j]) - << (ind * 8); - } - } - - for (; count--; ++start, res += TotalRowSize) { - const auto bitmaskIdx = start / 8; - const auto bitmaskShift = start % 8; - - for (ui32 j = 0; j < BitmaskSize; ++j) { - bitmaskMatrix[j] |= ui64(res[BitmaskOffset + j]) - << (bitmaskShift * 8); - } - - if (bitmaskShift == 7 || count == 0) { - for (auto &m : bitmaskMatrix) - m = transposeBitmatrix(m); - for (ui32 j = Columns.size(); j--;) - isValidBitmask[Columns[j].OriginalIndex][bitmaskIdx] = - ui8(bitmaskMatrix[j / 8] >> ((j % 8) * 8)); - std::fill(bitmaskMatrix.begin(), bitmaskMatrix.end(), 0); - - if (count && count < 8) { - /// ready last bitmatrix bytes - for (ui32 j = Columns.size(); j--;) - bitmaskMatrix[j / 8] |= - (isValidBitmask[Columns[j].OriginalIndex] - [bitmaskIdx + 1] & - (0xFF << count)) - << ((j % 8) * 8); - - for (auto &m : bitmaskMatrix) - m = transposeBitmatrix(m); - } - } - - for (auto &col : FixedNPOTColumns_) { - std::memcpy(columns[col.OriginalIndex] + start * col.DataSize, - res + col.Offset, col.DataSize); - } - -#define PackPOTColumn(POT) \ - for (auto &col : FixedPOTColumns_[POT]) { \ - std::memcpy(columns[col.OriginalIndex] + start * (1u << POT), \ - res + col.Offset, 1u << POT); \ - } - PackPOTColumn(0); - PackPOTColumn(1); - PackPOTColumn(2); - PackPOTColumn(3); - PackPOTColumn(4); -#undef PackPOTColumn - - for (auto &col : VariableColumns_) { - const auto dataOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * start); - auto *const data = columns[col.OriginalIndex + 1] + dataOffset; - - ui32 size = ReadUnaligned<ui8>(res + col.Offset); - - if (size < 255) { // embedded str - std::memcpy(data, res + col.Offset + 1, size); - } else { // overflow buffer used - const auto prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32)); - const auto overflowOffset = ReadUnaligned<ui32>( - res + col.Offset + 1 + 0 * sizeof(ui32)); - const auto overflowSize = ReadUnaligned<ui32>( - res + col.Offset + 1 + 1 * sizeof(ui32)); - - std::memcpy(data, res + col.Offset + 1 + 2 * sizeof(ui32), - prefixSize); - std::memcpy(data + prefixSize, overflow.data() + overflowOffset, - overflowSize); - - size = prefixSize + overflowSize; - } - - WriteUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * (start + 1), - dataOffset + size); - } - } -} - -#define MULTI_8_I(C, i) \ - C(i, 0) C(i, 1) C(i, 2) C(i, 3) C(i, 4) C(i, 5) C(i, 6) C(i, 7) -#define MULTI_8(C, A) \ - C(A, 0) C(A, 1) C(A, 2) C(A, 3) C(A, 4) C(A, 5) C(A, 6) C(A, 7) - -template <typename TTraits> -void TTupleLayoutFallback<TTraits>::Pack( - const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const { - std::vector<const ui8 *> block_columns; - for (const auto col_ind : BlockColumnsOrigInds_) { - block_columns.push_back(columns[col_ind]); - } - - for (size_t row_ind = 0; row_ind < count; row_ind += BlockRows_) { - const size_t cur_block_size = std::min(count - row_ind, BlockRows_); - size_t cols_past = 0; - - for (const auto &simd_block : SIMDBlock_) { -#define CASE(i, j) \ - case i *kSIMDMaxCols + j: \ - SIMDPack<TTraits>::template PackTupleOrImpl<i + 1, j + 1>( \ - block_columns.data() + cols_past, res + simd_block.RowOffset, \ - cur_block_size, BlockFixedColsSizes_.data() + cols_past, \ - BlockColsOffsets_.data() + cols_past, TotalRowSize, \ - SIMDPermMasks_.data() + simd_block.PermMaskOffset, start); \ - break; - - switch ((simd_block.InnerLoopIters - 1) * kSIMDMaxCols + - simd_block.Cols - 1) { - MULTI_8(MULTI_8_I, CASE) - - default: - std::abort(); - } - -#undef CASE - - cols_past += simd_block.Cols; - } - - PackTupleFallbackColImpl( - block_columns.data() + cols_past, res, - BlockColsOffsets_.size() - cols_past, cur_block_size, - BlockFixedColsSizes_.data() + cols_past, - BlockColsOffsets_.data() + cols_past, TotalRowSize, start); - - for (ui32 cols_ind = 0; cols_ind < Columns.size(); cols_ind += 8) { - const ui8 *bitmasks[8]; - const size_t cols = std::min<size_t>(8ul, Columns.size() - cols_ind); - for (size_t ind = 0; ind != cols; ++ind) { - const auto &col = Columns[cols_ind + ind]; - bitmasks[ind] = isValidBitmask[col.OriginalIndex] + start / 8; - } - const ui8 ones_byte = 0xFF; - for (size_t ind = cols; ind != 8; ++ind) { - // dereferencable + all-ones fast path - bitmasks[ind] = &ones_byte; - } - - const auto advance_masks = [&] { - for (size_t ind = 0; ind != cols; ++ind) { - ++bitmasks[ind]; - } - }; - - const size_t first_full_byte = - std::min<size_t>((8ul - start) & 7, cur_block_size); - size_t block_row_ind = 0; - - const auto simple_mask_transpose = [&](const size_t until) { - for (; block_row_ind < until; ++block_row_ind) { - const auto shift = (start + block_row_ind) % 8; - - const auto new_res = res + block_row_ind * TotalRowSize; - const auto res = new_res; - - res[BitmaskOffset + cols_ind / 8] = 0; - for (size_t col_ind = 0; col_ind != cols; ++col_ind) { - res[BitmaskOffset + cols_ind / 8] |= - ((bitmasks[col_ind][0] >> shift) & 1u) << col_ind; - } - } - }; - - simple_mask_transpose(first_full_byte); - if (first_full_byte) { - advance_masks(); - } - - for (; block_row_ind + 7 < cur_block_size; block_row_ind += 8) { - transposeBitmatrix(res + block_row_ind * TotalRowSize + - BitmaskOffset + cols_ind / 8, - bitmasks, TotalRowSize); - advance_masks(); - } - - simple_mask_transpose(cur_block_size); - } - - for (size_t block_row_ind = 0; block_row_ind != cur_block_size; - ++block_row_ind) { - - const auto new_start = start + block_row_ind; - const auto start = new_start; - - const auto new_res = res + block_row_ind * TotalRowSize; - const auto res = new_res; - - ui32 hash = 0; - bool anyOverflow = false; - - for (ui32 i = KeyColumnsFixedNum; i < KeyColumns.size(); ++i) { - auto &col = KeyColumns[i]; - auto dataOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * start); - auto nextOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * (start + 1)); - auto size = nextOffset - dataOffset; - - if (size >= col.DataSize) { - anyOverflow = true; - break; - } - } - - for (auto &col : VariableColumns_) { - auto dataOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * start); - auto nextOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * (start + 1)); - auto size = nextOffset - dataOffset; - auto data = columns[col.OriginalIndex + 1] + dataOffset; - if (size >= col.DataSize) { - res[col.Offset] = 255; - - auto prefixSize = (col.DataSize - 1 - 2 * sizeof(ui32)); - auto overflowSize = size - prefixSize; - auto overflowOffset = overflow.size(); - - overflow.resize(overflowOffset + overflowSize); - - WriteUnaligned<ui32>(res + col.Offset + 1 + - 0 * sizeof(ui32), - overflowOffset); - WriteUnaligned<ui32>( - res + col.Offset + 1 + 1 * sizeof(ui32), overflowSize); - std::memcpy(res + col.Offset + 1 + 2 * sizeof(ui32), data, - prefixSize); - std::memcpy(overflow.data() + overflowOffset, - data + prefixSize, overflowSize); - } else { - Y_DEBUG_ABORT_UNLESS(size < 255); - res[col.Offset] = size; - std::memcpy(res + col.Offset + 1, data, size); - std::memset(res + col.Offset + 1 + size, 0, - col.DataSize - (size + 1)); - } - if (anyOverflow && col.Role == EColumnRole::Key) { - hash = CalculateCRC32<TTraits>((ui8 *)&size, sizeof(ui32), - hash); - hash = CalculateCRC32<TTraits>(data, size, hash); - } - } - - // isValid bitmap is NOT included into hashed data - if (anyOverflow) { - hash = CalculateCRC32<TTraits>( - res + KeyColumnsOffset, - KeyColumnsFixedEnd - KeyColumnsOffset, hash); - } else { - hash = CalculateCRC32<TTraits>( - res + KeyColumnsOffset, KeyColumnsEnd - KeyColumnsOffset); - } - WriteUnaligned<ui32>(res, hash); - } - - start += cur_block_size; - res += cur_block_size * TotalRowSize; - } -} - -template <typename TTraits> -void TTupleLayoutFallback<TTraits>::Unpack( - ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const { - - std::vector<ui8 *> block_columns; - for (const auto col_ind : BlockColumnsOrigInds_) { - block_columns.push_back(columns[col_ind]); - } - - for (size_t row_ind = 0; row_ind < count; row_ind += BlockRows_) { - const size_t cur_block_size = std::min(count - row_ind, BlockRows_); - size_t cols_past = 0; - - for (const auto &simd_block : SIMDBlock_) { -#define CASE(i, j) \ - case i *kSIMDMaxCols + j: \ - SIMDPack<TTraits>::template UnpackTupleOrImpl<i + 1, j + 1>( \ - res + simd_block.RowOffset, block_columns.data() + cols_past, \ - cur_block_size, BlockFixedColsSizes_.data() + cols_past, \ - BlockColsOffsets_.data() + cols_past, TotalRowSize, \ - SIMDPermMasks_.data() + simd_block.PermMaskOffset + i * j, start); \ - break; - - switch ((simd_block.InnerLoopIters - 1) * kSIMDMaxCols + - simd_block.Cols - 1) { - MULTI_8(MULTI_8_I, CASE) - - default: - std::abort(); - } - -#undef CASE - - cols_past += simd_block.Cols; - } - - UnpackTupleFallbackColImpl( - res, block_columns.data() + cols_past, - BlockColsOffsets_.size() - cols_past, cur_block_size, - BlockFixedColsSizes_.data() + cols_past, - BlockColsOffsets_.data() + cols_past, TotalRowSize, start); - - for (ui32 cols_ind = 0; cols_ind < Columns.size(); cols_ind += 8) { - ui8 *bitmasks[8]; - const size_t cols = std::min<size_t>(8ul, Columns.size() - cols_ind); - for (size_t ind = 0; ind != cols; ++ind) { - const auto &col = Columns[cols_ind + ind]; - bitmasks[ind] = isValidBitmask[col.OriginalIndex] + start / 8; - } - ui8 trash_byte; - for (size_t ind = cols; ind != 8; ++ind) { - bitmasks[ind] = &trash_byte; // dereferencable - } - - const auto advance_masks = [&] { - for (size_t ind = 0; ind != cols; ++ind) { - ++bitmasks[ind]; - } - }; - - const size_t first_full_byte = - std::min<size_t>((8ul - start) & 7, cur_block_size); - size_t block_row_ind = 0; - - const auto simple_mask_transpose = [&](const size_t until) { - for (size_t col_ind = 0; - block_row_ind != until && col_ind != cols; ++col_ind) { - auto col_bitmask = - bitmasks[col_ind][0] & ~((0xFF << (block_row_ind & 7)) ^ - (0xFF << (until & 7))); - - for (size_t row_ind = block_row_ind; row_ind < until; - ++row_ind) { - const auto shift = (start + row_ind) % 8; - - const auto new_res = res + row_ind * TotalRowSize; - const auto res = new_res; - - col_bitmask |= - ((res[BitmaskOffset + cols_ind / 8] >> col_ind) & - 1u) - << shift; - } - - bitmasks[col_ind][0] = col_bitmask; - } - block_row_ind = until; - }; - - simple_mask_transpose(first_full_byte); - if (first_full_byte) { - advance_masks(); - } - - for (; block_row_ind + 7 < cur_block_size; block_row_ind += 8) { - transposeBitmatrix(bitmasks, - res + block_row_ind * TotalRowSize + - BitmaskOffset + cols_ind / 8, - TotalRowSize); - advance_masks(); - } - - simple_mask_transpose(cur_block_size); - } - - for (size_t block_row_ind = 0; block_row_ind != cur_block_size; - ++block_row_ind) { - - const auto new_start = start + block_row_ind; - const auto start = new_start; - - const auto new_res = res + block_row_ind * TotalRowSize; - const auto res = new_res; - - for (auto &col : VariableColumns_) { - const auto dataOffset = ReadUnaligned<ui32>( - columns[col.OriginalIndex] + sizeof(ui32) * start); - auto *const data = columns[col.OriginalIndex + 1] + dataOffset; - - ui32 size = ReadUnaligned<ui8>(res + col.Offset); - - if (size < 255) { // embedded str - std::memcpy(data, res + col.Offset + 1, size); - } else { // overflow buffer used - const auto prefixSize = - (col.DataSize - 1 - 2 * sizeof(ui32)); - const auto overflowOffset = ReadUnaligned<ui32>( - res + col.Offset + 1 + 0 * sizeof(ui32)); - const auto overflowSize = ReadUnaligned<ui32>( - res + col.Offset + 1 + 1 * sizeof(ui32)); - - std::memcpy(data, res + col.Offset + 1 + 2 * sizeof(ui32), - prefixSize); - std::memcpy(data + prefixSize, - overflow.data() + overflowOffset, overflowSize); - - size = prefixSize + overflowSize; - } - - WriteUnaligned<ui32>(columns[col.OriginalIndex] + - sizeof(ui32) * (start + 1), - dataOffset + size); - } - } - - start += cur_block_size; - res += cur_block_size * TotalRowSize; - } -} - -template __attribute__((target("avx2"))) void -TTupleLayoutFallback<NSimd::TSimdAVX2Traits>::Pack( - const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; -template __attribute__((target("sse4.2"))) void -TTupleLayoutFallback<NSimd::TSimdSSE42Traits>::Pack( - const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; - -template __attribute__((target("avx2"))) void -TTupleLayoutFallback<NSimd::TSimdAVX2Traits>::Unpack( - ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; -template __attribute__((target("sse4.2"))) void -TTupleLayoutFallback<NSimd::TSimdSSE42Traits>::Unpack( - ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; - -} // namespace NPackedTuple -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h b/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h deleted file mode 100644 index 3c6b76750c..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/tuple.h +++ /dev/null @@ -1,136 +0,0 @@ -#pragma once - -#include <yql/essentials/minikql/mkql_node.h> -#include <yql/essentials/public/udf/udf_data_type.h> -#include <yql/essentials/public/udf/udf_types.h> - -#include <util/generic/buffer.h> - -#include <util/system/cpu_id.h> -#include <contrib/ydb/library/yql/utils/simd/simd.h> - -namespace NKikimr { -namespace NMiniKQL { -namespace NPackedTuple { - -// Defines if data type of particular column variable or fixed -enum class EColumnSizeType { Fixed, Variable }; - -// Defines if particular column is key column or payload column -enum class EColumnRole { Key, Payload }; - -// Describes layout and size of particular column -struct TColumnDesc { - ui32 ColumnIndex = 0; // Index of the column in particular layout - ui32 OriginalIndex = 0; // Index of the column in input representation - EColumnRole Role = EColumnRole::Payload; // Role of the particular column in - // tuple (Key or Payload) - EColumnSizeType SizeType = - EColumnSizeType::Fixed; // Fixed size or variable size column - ui32 DataSize = 0; // Size of the column in bytes for fixed size part - // Must be same for matching key columns - ui32 Offset = - 0; // Offset in bytes for column value from the beginning of tuple -}; - -// Defines in memory layout of tuple. -struct TTupleLayout { - std::vector<TColumnDesc> OrigColumns; // Columns description and order as - // passed during layout construction - std::vector<TColumnDesc> Columns; // Vector describing all columns in order - // corresponding to tuple layout - std::vector<TColumnDesc> KeyColumns; // Vector describing key columns - std::vector<TColumnDesc> - PayloadColumns; // Vector describing payload columns - ui32 KeyColumnsNum; // Total number of key columns - ui32 KeyColumnsSize; // Total size of all key columns in bytes - ui32 KeyColumnsOffset; // Start of row-packed keys data - ui32 KeyColumnsFixedEnd; // Offset in row-packed keys data of first variable - // key (can be same as KeyColumnsEnd, if there are - // none) - ui32 KeyColumnsFixedNum; // Number of fixed-size columns - ui32 KeyColumnsEnd; // First byte after key columns. Start of bitmask for - // row-based columns - ui32 BitmaskSize; // Size of bitmask for null values flag in columns - ui32 BitmaskOffset; // Offset of nulls bitmask. = KeyColumnsEnd - ui32 BitmaskEnd; // First byte after bitmask. = PayloadOffset - ui32 PayloadSize; // Total size in bytes of the payload columns - ui32 PayloadOffset; // Offset of payload values. = BitmaskEnd. - ui32 PayloadEnd; // First byte after payload - ui32 TotalRowSize; // Total size of bytes for packed row - - // Creates new tuple layout based on provided columns description. - static THolder<TTupleLayout> - Create(const std::vector<TColumnDesc> &columns); - - TTupleLayout(const std::vector<TColumnDesc> &columns) - : OrigColumns(columns) {} - virtual ~TTupleLayout() {} - - // Takes array of pointer to columns, array of validity bitmaps, - // outputs packed rows - virtual void Pack(const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, - ui32 start, ui32 count) const = 0; - - // Takes packed rows, - // outputs array of pointer to columns, array of validity bitmaps - virtual void Unpack(ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, - ui32 start, ui32 count) const = 0; -}; - -template <typename TTrait> struct TTupleLayoutFallback : public TTupleLayout { - - TTupleLayoutFallback(const std::vector<TColumnDesc> &columns); - - void Pack(const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const override; - - void Unpack(ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, - ui32 start, ui32 count) const override; - - private: - std::array<std::vector<TColumnDesc>, 5> - FixedPOTColumns_; // Fixed-size columns for power-of-two sizes from 1 to - // 16 bytes - std::vector<TColumnDesc> FixedNPOTColumns_; // Remaining fixed-size columns - std::vector<TColumnDesc> VariableColumns_; // Variable-size columns only - using TSimdI8 = typename TTrait::TSimdI8; - template <class T> using TSimd = typename TTrait::template TSimd8<T>; - - static constexpr ui8 kSIMDMaxCols = 8; - static constexpr ui8 kSIMDMaxInnerLoopSize = 8; - - size_t BlockRows_; // Estimated rows per cache block - std::vector<size_t> BlockColsOffsets_; - std::vector<size_t> BlockFixedColsSizes_; - std::vector<size_t> BlockColumnsOrigInds_; - - struct SIMDDesc { - ui8 InnerLoopIters; - ui8 Cols; - size_t PermMaskOffset; - size_t RowOffset; - }; - std::vector<SIMDDesc> SIMDBlock_; // SIMD iterations description - std::vector<TSimd<ui8>> SIMDPermMasks_; // SIMD precomputed masks -}; - -template <> -void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Pack( - const ui8 **columns, const ui8 **isValidBitmask, ui8 *res, - std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; - -template <> -void TTupleLayoutFallback<NSimd::TSimdFallbackTraits>::Unpack( - ui8 **columns, ui8 **isValidBitmask, const ui8 *res, - const std::vector<ui8, TMKQLAllocator<ui8>> &overflow, ui32 start, - ui32 count) const; - -} // namespace NPackedTuple -} // namespace NMiniKQL -} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make b/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make deleted file mode 100644 index 4d27ab3b99..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/ut/ya.make +++ /dev/null @@ -1,41 +0,0 @@ -UNITTEST_FOR(yql/essentials/minikql/comp_nodes/packed_tuple) - -IF (SANITIZER_TYPE OR NOT OPENSOURCE) - REQUIREMENTS(ram:32) -ENDIF() - -IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND) - TIMEOUT(3600) - SIZE(LARGE) - TAG(ya:fat) -ELSE() - TIMEOUT(600) - SIZE(MEDIUM) -ENDIF() - - -SRCS( - packed_tuple_ut.cpp -) - -PEERDIR( - yql/essentials/public/udf - yql/essentials/public/udf/arrow - yql/essentials/public/udf/service/exception_policy - yql/essentials/sql/pg_dummy -) - -CFLAGS( - -mprfchw -) - -YQL_LAST_ABI_VERSION() - -IF (MKQL_RUNTIME_VERSION) - CFLAGS( - -DMKQL_RUNTIME_VERSION=$MKQL_RUNTIME_VERSION - ) -ENDIF() - - -END() diff --git a/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make b/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make deleted file mode 100644 index fa3ffbf859..0000000000 --- a/yql/essentials/minikql/comp_nodes/packed_tuple/ya.make +++ /dev/null @@ -1,28 +0,0 @@ -LIBRARY() - -SRCS( - tuple.cpp -) - -PEERDIR( - contrib/libs/apache/arrow - yql/essentials/types/binary_json - yql/essentials/minikql - yql/essentials/utils - yql/essentials/utils/log - library/cpp/digest/crc32c -) - -CFLAGS( - -mprfchw - -mavx2 - -DMKQL_DISABLE_CODEGEN -) - -YQL_LAST_ABI_VERSION() - -END() - -RECURSE_FOR_TESTS( - ut -) diff --git a/yql/essentials/minikql/comp_nodes/ya.make b/yql/essentials/minikql/comp_nodes/ya.make index 1b2cc49327..fa7a96b61a 100644 --- a/yql/essentials/minikql/comp_nodes/ya.make +++ b/yql/essentials/minikql/comp_nodes/ya.make @@ -13,7 +13,6 @@ END() RECURSE( llvm14 no_llvm - packed_tuple ) RECURSE_FOR_TESTS( diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc index 89b315fa39..3c1531eac7 100644 --- a/yql/essentials/minikql/comp_nodes/ya.make.inc +++ b/yql/essentials/minikql/comp_nodes/ya.make.inc @@ -160,7 +160,6 @@ PEERDIR( yql/essentials/public/udf/arrow yql/essentials/parser/pg_wrapper/interface yql/essentials/utils - contrib/ydb/library/actors/core yql/essentials/public/issue/protos ) diff --git a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp index 0a28de9162..400f77e9d1 100644 --- a/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp +++ b/yql/essentials/minikql/computation/mkql_computation_pattern_cache_ut.cpp @@ -11,7 +11,6 @@ #include <yql/essentials/minikql/computation/mkql_computation_node_impl.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> #include <yql/essentials/minikql/comp_nodes/mkql_factories.h> -#include <contrib/ydb/library/yql/dq/proto/dq_tasks.pb.h> #include <library/cpp/testing/unittest/registar.h> diff --git a/yql/essentials/minikql/computation/mkql_spiller_factory.h b/yql/essentials/minikql/computation/mkql_spiller_factory.h index c9c1bfab38..0d6dffe673 100644 --- a/yql/essentials/minikql/computation/mkql_spiller_factory.h +++ b/yql/essentials/minikql/computation/mkql_spiller_factory.h @@ -2,7 +2,9 @@ #include "mkql_spiller.h" -#include <contrib/ydb/library/yql/dq/actors/spilling/spilling_counters.h> +namespace NYql::NDq { +struct TSpillingTaskCounters; +} namespace NKikimr::NMiniKQL { @@ -11,7 +13,7 @@ class ISpillerFactory : private TNonCopyable public: virtual ISpiller::TPtr CreateSpiller() = 0; - virtual void SetTaskCounters(TIntrusivePtr<NYql::NDq::TSpillingTaskCounters> spillingTaskCounters) = 0; + virtual void SetTaskCounters(const TIntrusivePtr<NYql::NDq::TSpillingTaskCounters>& spillingTaskCounters) = 0; virtual ~ISpillerFactory(){} }; diff --git a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h index 9ea74569b1..c053b2c52e 100644 --- a/yql/essentials/minikql/computation/mock_spiller_factory_ut.h +++ b/yql/essentials/minikql/computation/mock_spiller_factory_ut.h @@ -8,7 +8,7 @@ namespace NKikimr::NMiniKQL { class TMockSpillerFactory : public ISpillerFactory { public: - void SetTaskCounters(TIntrusivePtr<NYql::NDq::TSpillingTaskCounters> /*spillingTaskCounters*/) override { + void SetTaskCounters(const TIntrusivePtr<NYql::NDq::TSpillingTaskCounters>& /*spillingTaskCounters*/) override { } ISpiller::TPtr CreateSpiller() override { diff --git a/yql/essentials/minikql/computation/ut/ya.make.inc b/yql/essentials/minikql/computation/ut/ya.make.inc index 0b419140e2..4de4b13fc5 100644 --- a/yql/essentials/minikql/computation/ut/ya.make.inc +++ b/yql/essentials/minikql/computation/ut/ya.make.inc @@ -30,7 +30,6 @@ PEERDIR( library/cpp/threading/local_executor yql/essentials/parser/pg_wrapper yql/essentials/public/udf/service/exception_policy - contrib/ydb/library/yql/dq/proto ) YQL_LAST_ABI_VERSION() diff --git a/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp b/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp index 78d62d423d..0f2a676190 100644 --- a/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp +++ b/yql/essentials/minikql/invoke_builtins/mkql_builtins_convert.cpp @@ -572,12 +572,13 @@ struct TStringConvert { }; NUdf::TUnboxedValuePod JsonToJsonDocument(const NUdf::TUnboxedValuePod value) { - auto binaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(value.AsStringRef()); - if (!binaryJson.IsSuccess()) { + auto maybeBinaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(value.AsStringRef()); + if (std::holds_alternative<TString>(maybeBinaryJson)) { // JSON parse error happened, return NULL return NUdf::TUnboxedValuePod(); } - return MakeString(TStringBuf(binaryJson->Data(), binaryJson->Size())); + const auto& binaryJson = std::get<NKikimr::NBinaryJson::TBinaryJson>(maybeBinaryJson); + return MakeString(TStringBuf(binaryJson.Data(), binaryJson.Size())); } struct TJsonToJsonDocumentConvert { diff --git a/yql/essentials/minikql/jsonpath/ut/test_base.cpp b/yql/essentials/minikql/jsonpath/ut/test_base.cpp index feceecddb1..c9b4f5669d 100644 --- a/yql/essentials/minikql/jsonpath/ut/test_base.cpp +++ b/yql/essentials/minikql/jsonpath/ut/test_base.cpp @@ -26,7 +26,7 @@ void TJsonPathTestBase::RunTestCase(const TString& rawJson, const TString& rawJs try { const auto unboxedValueJson = TValue(ParseJson(rawJson)); - const auto binaryJson = *SerializeToBinaryJson(rawJson);; + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson)); auto reader = TBinaryJsonReader::Make(binaryJson); auto binaryJsonRoot = TValue(reader->GetRootCursor()); @@ -77,7 +77,7 @@ void TJsonPathTestBase::RunRuntimeErrorTestCase(const TString& rawJson, const TS try { const auto unboxedValueJson = TValue(ParseJson(rawJson)); - const auto binaryJson = *SerializeToBinaryJson(rawJson); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson)); auto reader = TBinaryJsonReader::Make(binaryJson); auto binaryJsonRoot = TValue(reader->GetRootCursor()); @@ -105,7 +105,7 @@ void TJsonPathTestBase::RunVariablesTestCase(const TString& rawJson, const THash try { const auto unboxedValueJson = TValue(ParseJson(rawJson)); - const auto binaryJson = *SerializeToBinaryJson(rawJson); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(rawJson)); auto reader = TBinaryJsonReader::Make(binaryJson); auto binaryJsonRoot = TValue(reader->GetRootCursor()); @@ -120,7 +120,7 @@ void TJsonPathTestBase::RunVariablesTestCase(const TString& rawJson, const THash storage.reserve(variables.size()); readers.reserve(variables.size()); for (const auto& it : variables) { - storage.push_back(*SerializeToBinaryJson(it.second)); + storage.push_back(std::get<TBinaryJson>(SerializeToBinaryJson(it.second))); readers.push_back(TBinaryJsonReader::Make(storage.back())); binaryJsonVariables[it.first] = TValue(readers.back()->GetRootCursor()); } diff --git a/yql/essentials/minikql/mkql_type_ops.cpp b/yql/essentials/minikql/mkql_type_ops.cpp index 6b823dbdf5..11762e96c4 100644 --- a/yql/essentials/minikql/mkql_type_ops.cpp +++ b/yql/essentials/minikql/mkql_type_ops.cpp @@ -2518,11 +2518,12 @@ NUdf::TUnboxedValuePod ValueFromString(NUdf::EDataSlot type, NUdf::TStringRef bu case NUdf::EDataSlot::JsonDocument: { auto binaryJson = NKikimr::NBinaryJson::SerializeToBinaryJson(buf); - if (binaryJson.IsFail()) { + if (std::holds_alternative<TString>(binaryJson)) { // JSON parse error happened, return NULL return NUdf::TUnboxedValuePod(); } - return MakeString(TStringBuf(binaryJson->Data(), binaryJson->Size())); + const auto& value = std::get<NKikimr::NBinaryJson::TBinaryJson>(binaryJson); + return MakeString(TStringBuf(value.Data(), value.Size())); } case NUdf::EDataSlot::Decimal: diff --git a/yql/essentials/providers/common/dq/ya.make b/yql/essentials/providers/common/dq/ya.make new file mode 100644 index 0000000000..e0ce0e804a --- /dev/null +++ b/yql/essentials/providers/common/dq/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + yql_dq_integration_impl.cpp + yql_dq_optimization_impl.cpp +) + +PEERDIR( + yql/essentials/core/dq_integration +) + +END() diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp new file mode 100644 index 0000000000..600e3c72e2 --- /dev/null +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.cpp @@ -0,0 +1,96 @@ +#include "yql_dq_integration_impl.h" + +namespace NYql { + +ui64 TDqIntegrationBase::Partition(const TDqSettings&, size_t, const TExprNode&, + TVector<TString>&, TString*, TExprContext&, bool) { + return 0; +} + +bool TDqIntegrationBase::CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) { + Y_UNUSED(skipIssues); + Y_UNUSED(node); + Y_UNUSED(ctx); + return true; +} + +bool TDqIntegrationBase::CanRead(const TExprNode&, TExprContext&, bool) { + return false; +} + +TMaybe<ui64> TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVector<const TExprNode*> &, TExprContext&) { + return Nothing(); +} + +TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) { + return read; +} + +TMaybe<TOptimizerStatistics> TDqIntegrationBase::ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) { + Y_UNUSED(readWrap); + Y_UNUSED(ctx); + return Nothing(); +} + +TExprNode::TPtr TDqIntegrationBase::RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) { + Y_UNUSED(ctx); + return write; +} + +TMaybe<bool> TDqIntegrationBase::CanWrite(const TExprNode&, TExprContext&) { + return Nothing(); +} + +bool TDqIntegrationBase::CanBlockRead(const NNodes::TExprBase&, TExprContext&, TTypeAnnotationContext&) { + return false; +} + +TExprNode::TPtr TDqIntegrationBase::WrapWrite(const TExprNode::TPtr& write, TExprContext&) { + return write; +} + +void TDqIntegrationBase::RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase&) { +} + +bool TDqIntegrationBase::CanFallback() { + return false; +} + +void TDqIntegrationBase::FillSourceSettings(const TExprNode&, ::google::protobuf::Any&, TString&, size_t, TExprContext&) { +} + +void TDqIntegrationBase::FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) { + Y_UNUSED(node); + Y_UNUSED(settings); + Y_UNUSED(sourceType); + YQL_ENSURE(false); +} + +void TDqIntegrationBase::FillSinkSettings(const TExprNode&, ::google::protobuf::Any&, TString&) { +} + +void TDqIntegrationBase::FillTransformSettings(const TExprNode&, ::google::protobuf::Any&) { +} + +void TDqIntegrationBase::Annotate(const TExprNode&, THashMap<TString, TString>&) { +} + +bool TDqIntegrationBase::PrepareFullResultTableParams(const TExprNode&, TExprContext&, THashMap<TString, TString>&, THashMap<TString, TString>&) { + return false; +} + +void TDqIntegrationBase::WriteFullResultTableRef(NYson::TYsonWriter&, const TVector<TString>&, const THashMap<TString, TString>&) { +} + +bool TDqIntegrationBase::FillSourcePlanProperties(const NNodes::TExprBase&, TMap<TString, NJson::TJsonValue>&) { + return false; +} + +bool TDqIntegrationBase::FillSinkPlanProperties(const NNodes::TExprBase&, TMap<TString, NJson::TJsonValue>&) { + return false; +} + +void TDqIntegrationBase::ConfigurePeepholePipeline(bool, const THashMap<TString, TString>&, TTransformationPipeline*) { +} + +} // namespace NYql diff --git a/yql/essentials/providers/common/dq/yql_dq_integration_impl.h b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h new file mode 100644 index 0000000000..7cc5fd7d89 --- /dev/null +++ b/yql/essentials/providers/common/dq/yql_dq_integration_impl.h @@ -0,0 +1,37 @@ +#pragma once + +#include <yql/essentials/core/dq_integration/yql_dq_integration.h> + +namespace NYql { + +class TDqIntegrationBase: public IDqIntegration { +public: + ui64 Partition(const TDqSettings& config, size_t maxPartitions, const TExprNode& node, + TVector<TString>& partitions, TString* clusterName, TExprContext& ctx, bool canFallback) override; + bool CheckPragmas(const TExprNode& node, TExprContext& ctx, bool skipIssues) override; + bool CanRead(const TExprNode& read, TExprContext& ctx, bool skipIssues) override; + TMaybe<ui64> EstimateReadSize(ui64 dataSizePerJob, ui32 maxTasksPerStage, const TVector<const TExprNode*>& nodes, TExprContext& ctx) override; + TExprNode::TPtr WrapRead(const TDqSettings& config, const TExprNode::TPtr& read, TExprContext& ctx) override; + TMaybe<TOptimizerStatistics> ReadStatistics(const TExprNode::TPtr& readWrap, TExprContext& ctx) override; + TExprNode::TPtr RecaptureWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; + void RegisterMkqlCompiler(NCommon::TMkqlCallableCompilerBase& compiler) override; + TMaybe<bool> CanWrite(const TExprNode& write, TExprContext& ctx) override; + bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx) override; + TExprNode::TPtr WrapWrite(const TExprNode::TPtr& write, TExprContext& ctx) override; + bool CanFallback() override; + void FillSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType, size_t, TExprContext&) override; + void FillLookupSourceSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sourceType) override; + void FillSinkSettings(const TExprNode& node, ::google::protobuf::Any& settings, TString& sinkType) override; + void FillTransformSettings(const TExprNode& node, ::google::protobuf::Any& settings) override; + void Annotate(const TExprNode& node, THashMap<TString, TString>& params) override; + bool PrepareFullResultTableParams(const TExprNode& root, TExprContext& ctx, THashMap<TString, TString>& params, THashMap<TString, TString>& secureParams) override; + void WriteFullResultTableRef(NYson::TYsonWriter& writer, const TVector<TString>& columns, const THashMap<TString, TString>& graphParams) override; + bool FillSourcePlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; + bool FillSinkPlanProperties(const NNodes::TExprBase& node, TMap<TString, NJson::TJsonValue>& properties) override; + void ConfigurePeepholePipeline(bool beforeDqTransforms, const THashMap<TString, TString>& params, TTransformationPipeline* pipeline) override; + +protected: + bool CanBlockReadTypes(const TStructExprType* node); +}; + +} // namespace NYql diff --git a/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp new file mode 100644 index 0000000000..6afa622f05 --- /dev/null +++ b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.cpp @@ -0,0 +1,29 @@ +#include "yql_dq_optimization_impl.h" + +namespace NYql { + +TExprNode::TPtr TDqOptimizationBase::RewriteRead(const TExprNode::TPtr& read, TExprContext& /*ctx*/) { + return read; +} + +TExprNode::TPtr TDqOptimizationBase::RewriteLookupRead(const TExprNode::TPtr& /*read*/, TExprContext& /*ctx*/) { + return {}; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& /*members*/, TExprContext& /*ctx*/) { + return read; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& /*countBase*/, TExprContext& /*ctx*/) { + return read; +} + +TExprNode::TPtr TDqOptimizationBase::ApplyUnordered(const TExprNode::TPtr& read, TExprContext& /*ctx*/) { + return read; +} + +TExprNode::TListType TDqOptimizationBase::ApplyExtend(const TExprNode::TListType& reads, bool /*ordered*/, TExprContext& /*ctx*/) { + return reads; +} + +} // namespace NYql diff --git a/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h new file mode 100644 index 0000000000..1427b50b0a --- /dev/null +++ b/yql/essentials/providers/common/dq/yql_dq_optimization_impl.h @@ -0,0 +1,17 @@ +#pragma once + +#include <yql/essentials/core/dq_integration/yql_dq_optimization.h> + +namespace NYql { + +class TDqOptimizationBase: public IDqOptimization { +public: + TExprNode::TPtr RewriteRead(const TExprNode::TPtr& read, TExprContext& ctx) override; + TExprNode::TPtr RewriteLookupRead(const TExprNode::TPtr& read, TExprContext& ctx) override; + TExprNode::TPtr ApplyExtractMembers(const TExprNode::TPtr& read, const TExprNode::TPtr& members, TExprContext& ctx) override; + TExprNode::TPtr ApplyTakeOrSkip(const TExprNode::TPtr& read, const TExprNode::TPtr& countBase, TExprContext& ctx) override; + TExprNode::TPtr ApplyUnordered(const TExprNode::TPtr& read, TExprContext& ctx) override; + TExprNode::TListType ApplyExtend(const TExprNode::TListType& reads, bool ordered, TExprContext& ctx) override; +}; + +} // namespace NYql diff --git a/yql/essentials/providers/common/ya.make b/yql/essentials/providers/common/ya.make index 1af0f9eddb..eba3397675 100644 --- a/yql/essentials/providers/common/ya.make +++ b/yql/essentials/providers/common/ya.make @@ -4,6 +4,7 @@ RECURSE( codec comp_nodes config + dq gateway metrics mkql diff --git a/yql/essentials/providers/pg/provider/ya.make b/yql/essentials/providers/pg/provider/ya.make index c117f7a975..6c0f6c0d03 100644 --- a/yql/essentials/providers/pg/provider/ya.make +++ b/yql/essentials/providers/pg/provider/ya.make @@ -17,8 +17,8 @@ YQL_LAST_ABI_VERSION() PEERDIR( yql/essentials/core yql/essentials/core/type_ann - contrib/ydb/library/yql/dq/integration - contrib/ydb/library/yql/providers/common/dq + yql/essentials/core/dq_integration + yql/essentials/providers/common/dq yql/essentials/providers/common/provider yql/essentials/providers/common/transform yql/essentials/providers/pg/expr_nodes diff --git a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp index 9940898e9b..2e8a9b434f 100644 --- a/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp +++ b/yql/essentials/providers/pg/provider/yql_pg_dq_integration.cpp @@ -1,5 +1,5 @@ #include "yql_pg_provider_impl.h" -#include <contrib/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h> +#include <yql/essentials/providers/common/dq/yql_dq_integration_impl.h> #include <yql/essentials/providers/pg/expr_nodes/yql_pg_expr_nodes.h> namespace NYql { diff --git a/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h b/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h index f63b54b5c2..c4cc036bae 100644 --- a/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h +++ b/yql/essentials/providers/pg/provider/yql_pg_provider_impl.h @@ -4,7 +4,7 @@ #include <yql/essentials/core/yql_data_provider.h> #include <yql/essentials/providers/common/transform/yql_visit.h> #include <yql/essentials/providers/common/transform/yql_exec.h> -#include <contrib/ydb/library/yql/dq/integration/yql_dq_integration.h> +#include <yql/essentials/core/dq_integration/yql_dq_integration.h> #include <util/generic/ptr.h> diff --git a/yql/essentials/types/binary_json/ut/container_ut.cpp b/yql/essentials/types/binary_json/ut/container_ut.cpp index 92144169b4..869d2323bd 100644 --- a/yql/essentials/types/binary_json/ut/container_ut.cpp +++ b/yql/essentials/types/binary_json/ut/container_ut.cpp @@ -35,7 +35,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); @@ -52,7 +52,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); @@ -78,7 +78,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.Json); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); const auto element = container.GetElement(testCase.Index); @@ -100,7 +100,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.Json); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); @@ -145,7 +145,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.Json); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); const auto result = container.Lookup(testCase.Key); @@ -186,7 +186,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.Json); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.Json)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); diff --git a/yql/essentials/types/binary_json/ut/entry_ut.cpp b/yql/essentials/types/binary_json/ut/entry_ut.cpp index 998bd64f63..0f099ed59d 100644 --- a/yql/essentials/types/binary_json/ut/entry_ut.cpp +++ b/yql/essentials/types/binary_json/ut/entry_ut.cpp @@ -33,7 +33,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); @@ -50,7 +50,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); const auto innerContainer = container.GetElement(0).GetContainer(); @@ -67,7 +67,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); @@ -86,7 +86,7 @@ public: }; for (const auto& testCase : testCases) { - const auto binaryJson = *SerializeToBinaryJson(testCase.first); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(testCase.first)); const auto reader = TBinaryJsonReader::Make(binaryJson); const auto container = reader->GetRootCursor(); diff --git a/yql/essentials/types/binary_json/ut/identity_ut.cpp b/yql/essentials/types/binary_json/ut/identity_ut.cpp index 58086d9537..50c4299d5c 100644 --- a/yql/essentials/types/binary_json/ut/identity_ut.cpp +++ b/yql/essentials/types/binary_json/ut/identity_ut.cpp @@ -40,7 +40,7 @@ public: void TestReadToJsonDom() { for (const TStringBuf json : TestCases) { - const auto binaryJson = *NBinaryJson::SerializeToBinaryJson(json); + const auto binaryJson = std::get<TBinaryJson>(NBinaryJson::SerializeToBinaryJson(json)); const auto value = NBinaryJson::ReadToJsonDom(binaryJson, &ValueBuilder); const auto jsonAfterBinaryJson = NDom::SerializeJsonDom(value); @@ -50,7 +50,7 @@ public: void TestSerializeToJson() { for (const TStringBuf json : TestCases) { - const auto binaryJson = *NBinaryJson::SerializeToBinaryJson(json); + const auto binaryJson = std::get<TBinaryJson>(NBinaryJson::SerializeToBinaryJson(json)); const auto jsonAfterBinaryJson = NBinaryJson::SerializeToJson(binaryJson); UNIT_ASSERT_VALUES_EQUAL(json, jsonAfterBinaryJson); diff --git a/yql/essentials/types/binary_json/ut/valid_ut.cpp b/yql/essentials/types/binary_json/ut/valid_ut.cpp index f8b1661866..3078375877 100644 --- a/yql/essentials/types/binary_json/ut/valid_ut.cpp +++ b/yql/essentials/types/binary_json/ut/valid_ut.cpp @@ -45,7 +45,7 @@ public: }; for (const TStringBuf json : testCases) { - const auto binaryJson = *SerializeToBinaryJson(json); + const auto binaryJson = std::get<TBinaryJson>(SerializeToBinaryJson(json)); const TStringBuf buffer(binaryJson.Data(), binaryJson.Size()); const auto error = IsValidBinaryJsonWithError(buffer); UNIT_ASSERT_C(!error.Defined(), TStringBuilder() << "BinaryJson for '" << json << "' is invalid because of '" << *error << "'"); diff --git a/yql/essentials/types/binary_json/write.cpp b/yql/essentials/types/binary_json/write.cpp index 9eecc0cba3..28dffea9db 100644 --- a/yql/essentials/types/binary_json/write.cpp +++ b/yql/essentials/types/binary_json/write.cpp @@ -715,26 +715,31 @@ template <typename TOnDemandValue> } } -TConclusion<TBinaryJson> SerializeToBinaryJsonImpl(const TStringBuf json) { +std::variant<TBinaryJson, TString> SerializeToBinaryJsonImpl(const TStringBuf json) { + std::variant<TBinaryJson, TString> res; TBinaryJsonCallbacks callbacks(/* throwException */ false); const simdjson::padded_string paddedJson(json); simdjson::ondemand::parser parser; try { auto doc = parser.iterate(paddedJson); if (auto status = doc.error(); status != simdjson::SUCCESS) { - return TConclusionStatus::Fail(simdjson::error_message(status)); + res = TString(simdjson::error_message(status)); + return res; } if (auto status = SimdJsonToJsonIndex(doc.value_unsafe(), callbacks); status != simdjson::SUCCESS) { - return TConclusionStatus::Fail(simdjson::error_message(status)); + res = TString(simdjson::error_message(status)); + return res; } } catch (const simdjson::simdjson_error& e) { - return TConclusionStatus::Fail(e.what()); + res = TString(e.what()); + return res; } TBinaryJsonSerializer serializer(std::move(callbacks).GetResult()); - return std::move(serializer).Serialize(); + res = std::move(serializer).Serialize(); + return res; } -TConclusion<TBinaryJson> SerializeToBinaryJson(const TStringBuf json) { +std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json) { return SerializeToBinaryJsonImpl(json); } diff --git a/yql/essentials/types/binary_json/write.h b/yql/essentials/types/binary_json/write.h index 0083637963..c4f8f89d2d 100644 --- a/yql/essentials/types/binary_json/write.h +++ b/yql/essentials/types/binary_json/write.h @@ -2,17 +2,18 @@ #include "format.h" -#include <contrib/ydb/library/conclusion/result.h> #include <yql/essentials/minikql/dom/node.h> #include <util/generic/maybe.h> +#include <variant> + namespace NKikimr::NBinaryJson { /** * @brief Translates textual JSON into BinaryJson */ -TConclusion<TBinaryJson> SerializeToBinaryJson(const TStringBuf json); +std::variant<TBinaryJson, TString> SerializeToBinaryJson(const TStringBuf json); /** * @brief Translates DOM layout from `yql/library/dom` library into BinaryJson |