diff options
author | aneporada <aneporada@ydb.tech> | 2023-05-26 19:25:46 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-05-26 19:25:46 +0300 |
commit | b9ecca1fb6edc7910b784daa6eb8e24a67325e6e (patch) | |
tree | 6ba5c7a2a75d32cfb439ebb26f9c47e9e90302de | |
parent | 7fb69b50f57423be98cc6d9b9cd15eb341080c59 (diff) | |
download | ydb-b9ecca1fb6edc7910b784daa6eb8e24a67325e6e.tar.gz |
Do not create minikql types in serialization functions
initial
9 files changed, 44 insertions, 43 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 08864c6556d..940e899211f 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -48,13 +48,13 @@ using namespace NKikimr; template<bool FastPack> class TDqOutputChannel : public IDqOutputChannel { public: - TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv, + TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc) : ChannelId(channelId) , OutputType(outputType) , BasicStats(ChannelId) , ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr) - , Packer(false, NMiniKQL::TListType::Create(OutputType, typeEnv)) + , Packer(OutputType) , Storage(settings.ChannelStorage) , HolderFactory(holderFactory) , TransportVersion(settings.TransportVersion) @@ -363,17 +363,17 @@ private: IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc) { if (settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0 || settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED) { - return new TDqOutputChannel<false>(channelId, outputType, typeEnv, holderFactory, settings, logFunc); + return new TDqOutputChannel<false>(channelId, outputType, holderFactory, settings, logFunc); } else { YQL_ENSURE(settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0, "Unsupported transport version " << (ui32)settings.TransportVersion); - return new TDqOutputChannel<true>(channelId, outputType, typeEnv, holderFactory, settings, logFunc); + return new TDqOutputChannel<true>(channelId, outputType, holderFactory, settings, logFunc); } } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index 49b65c2eda7..001ebb9bfc0 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -83,7 +83,7 @@ struct TDqOutputChannelSettings { }; IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType, - const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc = {}); } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp index fac3102ea74..20800da3223 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp @@ -91,7 +91,7 @@ void TestSingleRead(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 10; ++i) { auto row = ctx.CreateRow(i); @@ -131,7 +131,7 @@ void TestPartialRead(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 9; ++i) { auto row = ctx.CreateRow(i); @@ -184,7 +184,7 @@ void TestOverflow(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 8; ++i) { auto row = ctx.CreateRow(i); @@ -212,7 +212,7 @@ void TestPopAll(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 50; ++i) { auto row = ctx.CreateRow(i); @@ -249,7 +249,7 @@ void TestBigRow(TTestContext& ctx) { settings.CollectProfileStats = true; settings.TransportVersion = ctx.TransportVersion; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); { auto row = ctx.CreateRow(1); @@ -318,7 +318,7 @@ void TestSpillWithMockStorage(TTestContext& ctx) { auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul); settings.ChannelStorage = storage; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 35; ++i) { auto row = ctx.CreateRow(i); @@ -394,7 +394,7 @@ void TestOverflowWithMockStorage(TTestContext& ctx) { auto storage = MakeIntrusive<TMockChannelStorage>(500ul); settings.ChannelStorage = storage; - auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log); + auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log); for (i32 i = 0; i < 42; ++i) { auto row = ctx.CreateRow(i); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index c31f1afde3c..278429f9dcb 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -647,8 +647,7 @@ public: settings.ChannelStorage = execCtx.CreateChannelStorage(channelId); } - auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, typeEnv, - holderFactory, settings, LogFunc); + auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, holderFactory, settings, LogFunc); auto ret = AllocatedHolder->OutputChannels.emplace(channelId, outputChannel); YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated output channelId: " << channelId); diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index 2e006b0ce4d..bfcd936c405 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -44,12 +44,11 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU } template<bool Fast> -void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, const TTypeEnvironment& typeEnv, +void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, const THolderFactory& holderFactory, TUnboxedValueVector& buffer) { - auto listType = TListType::Create(const_cast<TType*>(itemType), typeEnv); using TPacker = TValuePackerTransport<Fast>; - TPacker packer(/* stable */ false, listType); + TPacker packer(/* stable */ false, itemType); packer.UnpackBatch(data.GetRaw(), holderFactory, buffer); } @@ -79,9 +78,9 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it switch (data.GetTransportVersion()) { case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0: - return DeserializeBufferPickleV1<false>(data, itemType, TypeEnv, HolderFactory, buffer); + return DeserializeBufferPickleV1<false>(data, itemType, HolderFactory, buffer); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: - return DeserializeBufferPickleV1<true>(data, itemType, TypeEnv, HolderFactory, buffer); + return DeserializeBufferPickleV1<true>(data, itemType, HolderFactory, buffer); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index 36fa0d9b263..b22e947cb78 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -26,16 +26,15 @@ public: template <class TForwardIterator> NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const { - const auto listType = NKikimr::NMiniKQL::TListType::Create(const_cast<NKikimr::NMiniKQL::TType*>(itemType), TypeEnv); if (TransportVersion == NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED || TransportVersion == NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0) { - NKikimr::NMiniKQL::TValuePackerTransport<false> packer(listType); + NKikimr::NMiniKQL::TValuePackerTransport<false> packer(itemType); return SerializeBatch(packer, first, last); } if (TransportVersion == NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0) { - NKikimr::NMiniKQL::TValuePackerTransport<true> packer(listType); + NKikimr::NMiniKQL::TValuePackerTransport<true> packer(itemType); return SerializeBatch(packer, first, last); } YQL_ENSURE(false, "Unsupported TransportVersion"); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 5ed7cf03acf..9c375c5d6a4 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -221,11 +221,14 @@ bool HasOptionalFields(const TType* type) { } } -TPackProperties ScanTypeProperties(const TType* type) { +TPackProperties ScanTypeProperties(const TType* type, bool assumeList) { TPackProperties props; if (HasOptionalFields(type)) { props.Set(EPackProps::UseOptionalMask); } + if (assumeList) { + return props; + } if (type->GetKind() == TType::EKind::Optional) { type = static_cast<const TOptionalType*>(type)->GetItemType(); if (!HasOptionalFields(type)) { @@ -808,7 +811,7 @@ template<bool Fast> TValuePackerGeneric<Fast>::TValuePackerGeneric(bool stable, const TType* type) : Stable_(stable) , Type_(type) - , State_(ScanTypeProperties(Type_)) + , State_(ScanTypeProperties(Type_, false)) { MKQL_ENSURE(!Fast || !Stable_, "Stable mode is not supported"); } @@ -886,7 +889,8 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) template<bool Fast> TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type) : Type_(type) - , State_(ScanTypeProperties(Type_)) + , State_(ScanTypeProperties(Type_, false)) + , IncrementalState_(ScanTypeProperties(Type_, true)) { MKQL_ENSURE(!stable, "Stable packing is not supported"); } @@ -894,7 +898,8 @@ TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* typ template<bool Fast> TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type) : Type_(type) - , State_(ScanTypeProperties(Type_)) + , State_(ScanTypeProperties(Type_, false)) + , IncrementalState_(ScanTypeProperties(Type_, true)) { } @@ -905,12 +910,10 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const TH template<bool Fast> void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const { - MKQL_ENSURE(Type_->IsList(), "UnpackBatch() requires list type"); - - auto& s = State_; + auto& s = IncrementalState_; ui64 len; ui32 topLength; - const TType* itemType = static_cast<const TListType*>(Type_)->GetItemType(); + const TType* itemType = Type_; if constexpr (!Fast) { auto pair = SkipEmbeddedLength<Fast>(buf); topLength = pair.first; @@ -951,20 +954,19 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValueP template<bool Fast> TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) { - MKQL_ENSURE(Type_->IsList(), "AddItem() is only supported for list types"); - const TType* itemType = static_cast<const TListType*>(Type_)->GetItemType(); + const TType* itemType = Type_; if (!ItemCount_) { Buffer_.Clear(); if constexpr (Fast) { // reserve place for list item count Buffer_.ReserveHeader(sizeof(ItemCount_)); } else { - State_.OptionalUsageMask.Reset(); + IncrementalState_.OptionalUsageMask.Reset(); Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE); } } - PackImpl<Fast, false>(itemType, Buffer_, value, State_); + PackImpl<Fast, false>(itemType, Buffer_, value, IncrementalState_); ++ItemCount_; return *this; } @@ -977,7 +979,6 @@ void TValuePackerTransport<Fast>::Clear() { template<bool Fast> const TPagedBuffer& TValuePackerTransport<Fast>::Finish() { - MKQL_ENSURE(Type_->IsList(), "Finish() is only supported for list types"); if constexpr (Fast) { char* dst = Buffer_.Header(sizeof(ItemCount_)); Y_VERIFY_DEBUG(dst); @@ -1000,7 +1001,7 @@ void TValuePackerTransport<Fast>::BuildMeta(bool addItemCount) const { const size_t itemCountSize = addItemCount ? GetPack64Length(ItemCount_) : 0; const size_t packedSize = Buffer_.Size() + itemCountSize; - auto& s = State_; + auto& s = addItemCount ? IncrementalState_ : State_; const bool useMask = s.Properties.Test(EPackProps::UseOptionalMask); const size_t maskSize = useMask ? s.OptionalUsageMask.CalcSerializedSize() : 0; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index cc4fabc071d..16c7d423f18 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -72,11 +72,11 @@ class TValuePackerTransport { public: using TSelf = TValuePackerTransport<Fast>; - explicit TValuePackerTransport(const TType *type); + explicit TValuePackerTransport(const TType* type); // for compatibility with TValuePackerGeneric - stable packing is not supported - TValuePackerTransport(bool stable, const TType *type); + TValuePackerTransport(bool stable, const TType* type); - // incremental packing - works only for List<T> type + // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout TSelf& AddItem(const NUdf::TUnboxedValuePod& value); size_t PackedSizeEstimate() const { return Buffer_.Size() + Buffer_.ReservedHeaderSize(); @@ -85,8 +85,9 @@ public: const TPagedBuffer& Finish(); TPagedBuffer FinishAndPull(); + // Pack()/Unpack() will pack/unpack single value of type T // reference is valid till the next call to Pack() - const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod& value) const; + const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod &value) const; NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const; void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const; private: @@ -96,6 +97,7 @@ private: ui64 ItemCount_ = 0; mutable TPagedBuffer Buffer_; mutable NDetails::TPackerState State_; + mutable NDetails::TPackerState IncrementalState_; }; using TValuePacker = TValuePackerGeneric<false>; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp index 0ad599e91d7..1ceb8ec4047 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp @@ -524,7 +524,8 @@ protected: if constexpr (Transport) { auto itemType = PgmBuilder.NewDataType(NUdf::TDataType<char *>::Id); auto listType = PgmBuilder.NewListType(itemType); - TValuePackerType packer(false, listType); + TValuePackerType packer(false, itemType); + TValuePackerType listPacker(false, listType); TStringBuf str = "01234567890ABCDEF"; @@ -538,7 +539,7 @@ protected: TString serializedStr; packer.Finish().CopyTo(serializedStr); - auto listObj = packer.Unpack(serializedStr, HolderFactory); + auto listObj = listPacker.Unpack(serializedStr, HolderFactory); UNIT_ASSERT_VALUES_EQUAL(listObj.GetListLength(), count); ui32 i = 0; const auto iter = listObj.GetListIterator(); |