aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-05-26 19:25:46 +0300
committeraneporada <aneporada@ydb.tech>2023-05-26 19:25:46 +0300
commitb9ecca1fb6edc7910b784daa6eb8e24a67325e6e (patch)
tree6ba5c7a2a75d32cfb439ebb26f9c47e9e90302de
parent7fb69b50f57423be98cc6d9b9cd15eb341080c59 (diff)
downloadydb-b9ecca1fb6edc7910b784daa6eb8e24a67325e6e.tar.gz
Do not create minikql types in serialization functions
initial
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp10
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp14
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp3
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp9
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h5
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp29
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h10
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp5
9 files changed, 44 insertions, 43 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 08864c6556d..940e899211f 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -48,13 +48,13 @@ using namespace NKikimr;
template<bool FastPack>
class TDqOutputChannel : public IDqOutputChannel {
public:
- TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType, const NMiniKQL::TTypeEnvironment& typeEnv,
+ TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType,
const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
: ChannelId(channelId)
, OutputType(outputType)
, BasicStats(ChannelId)
, ProfileStats(settings.CollectProfileStats ? &BasicStats : nullptr)
- , Packer(false, NMiniKQL::TListType::Create(OutputType, typeEnv))
+ , Packer(OutputType)
, Storage(settings.ChannelStorage)
, HolderFactory(holderFactory)
, TransportVersion(settings.TransportVersion)
@@ -363,17 +363,17 @@ private:
IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
{
if (settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0 ||
settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED)
{
- return new TDqOutputChannel<false>(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
+ return new TDqOutputChannel<false>(channelId, outputType, holderFactory, settings, logFunc);
} else {
YQL_ENSURE(settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0,
"Unsupported transport version " << (ui32)settings.TransportVersion);
- return new TDqOutputChannel<true>(channelId, outputType, typeEnv, holderFactory, settings, logFunc);
+ return new TDqOutputChannel<true>(channelId, outputType, holderFactory, settings, logFunc);
}
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index 49b65c2eda7..001ebb9bfc0 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -83,7 +83,7 @@ struct TDqOutputChannelSettings {
};
IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::TType* outputType,
- const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TDqOutputChannelSettings& settings, const TLogFunc& logFunc = {});
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
index fac3102ea74..20800da3223 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
@@ -91,7 +91,7 @@ void TestSingleRead(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 10; ++i) {
auto row = ctx.CreateRow(i);
@@ -131,7 +131,7 @@ void TestPartialRead(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 9; ++i) {
auto row = ctx.CreateRow(i);
@@ -184,7 +184,7 @@ void TestOverflow(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 8; ++i) {
auto row = ctx.CreateRow(i);
@@ -212,7 +212,7 @@ void TestPopAll(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 50; ++i) {
auto row = ctx.CreateRow(i);
@@ -249,7 +249,7 @@ void TestBigRow(TTestContext& ctx) {
settings.CollectProfileStats = true;
settings.TransportVersion = ctx.TransportVersion;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
{
auto row = ctx.CreateRow(1);
@@ -318,7 +318,7 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
auto storage = MakeIntrusive<TMockChannelStorage>(100'500ul);
settings.ChannelStorage = storage;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 35; ++i) {
auto row = ctx.CreateRow(i);
@@ -394,7 +394,7 @@ void TestOverflowWithMockStorage(TTestContext& ctx) {
auto storage = MakeIntrusive<TMockChannelStorage>(500ul);
settings.ChannelStorage = storage;
- auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.TypeEnv, ctx.HolderFactory, settings, Log);
+ auto ch = CreateDqOutputChannel(1, ctx.OutputType, ctx.HolderFactory, settings, Log);
for (i32 i = 0; i < 42; ++i) {
auto row = ctx.CreateRow(i);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index c31f1afde3c..278429f9dcb 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -647,8 +647,7 @@ public:
settings.ChannelStorage = execCtx.CreateChannelStorage(channelId);
}
- auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, typeEnv,
- holderFactory, settings, LogFunc);
+ auto outputChannel = CreateDqOutputChannel(channelId, *taskOutputType, holderFactory, settings, LogFunc);
auto ret = AllocatedHolder->OutputChannels.emplace(channelId, outputChannel);
YQL_ENSURE(ret.second, "task: " << TaskId << ", duplicated output channelId: " << channelId);
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index 2e006b0ce4d..bfcd936c405 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -44,12 +44,11 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU
}
template<bool Fast>
-void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, const TTypeEnvironment& typeEnv,
+void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType,
const THolderFactory& holderFactory, TUnboxedValueVector& buffer)
{
- auto listType = TListType::Create(const_cast<TType*>(itemType), typeEnv);
using TPacker = TValuePackerTransport<Fast>;
- TPacker packer(/* stable */ false, listType);
+ TPacker packer(/* stable */ false, itemType);
packer.UnpackBatch(data.GetRaw(), holderFactory, buffer);
}
@@ -79,9 +78,9 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it
switch (data.GetTransportVersion()) {
case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0:
- return DeserializeBufferPickleV1<false>(data, itemType, TypeEnv, HolderFactory, buffer);
+ return DeserializeBufferPickleV1<false>(data, itemType, HolderFactory, buffer);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
- return DeserializeBufferPickleV1<true>(data, itemType, TypeEnv, HolderFactory, buffer);
+ return DeserializeBufferPickleV1<true>(data, itemType, HolderFactory, buffer);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index 36fa0d9b263..b22e947cb78 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -26,16 +26,15 @@ public:
template <class TForwardIterator>
NDqProto::TData Serialize(TForwardIterator first, TForwardIterator last, const NKikimr::NMiniKQL::TType* itemType) const {
- const auto listType = NKikimr::NMiniKQL::TListType::Create(const_cast<NKikimr::NMiniKQL::TType*>(itemType), TypeEnv);
if (TransportVersion == NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED ||
TransportVersion == NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0)
{
- NKikimr::NMiniKQL::TValuePackerTransport<false> packer(listType);
+ NKikimr::NMiniKQL::TValuePackerTransport<false> packer(itemType);
return SerializeBatch(packer, first, last);
}
if (TransportVersion == NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0) {
- NKikimr::NMiniKQL::TValuePackerTransport<true> packer(listType);
+ NKikimr::NMiniKQL::TValuePackerTransport<true> packer(itemType);
return SerializeBatch(packer, first, last);
}
YQL_ENSURE(false, "Unsupported TransportVersion");
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 5ed7cf03acf..9c375c5d6a4 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -221,11 +221,14 @@ bool HasOptionalFields(const TType* type) {
}
}
-TPackProperties ScanTypeProperties(const TType* type) {
+TPackProperties ScanTypeProperties(const TType* type, bool assumeList) {
TPackProperties props;
if (HasOptionalFields(type)) {
props.Set(EPackProps::UseOptionalMask);
}
+ if (assumeList) {
+ return props;
+ }
if (type->GetKind() == TType::EKind::Optional) {
type = static_cast<const TOptionalType*>(type)->GetItemType();
if (!HasOptionalFields(type)) {
@@ -808,7 +811,7 @@ template<bool Fast>
TValuePackerGeneric<Fast>::TValuePackerGeneric(bool stable, const TType* type)
: Stable_(stable)
, Type_(type)
- , State_(ScanTypeProperties(Type_))
+ , State_(ScanTypeProperties(Type_, false))
{
MKQL_ENSURE(!Fast || !Stable_, "Stable mode is not supported");
}
@@ -886,7 +889,8 @@ TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value)
template<bool Fast>
TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type)
: Type_(type)
- , State_(ScanTypeProperties(Type_))
+ , State_(ScanTypeProperties(Type_, false))
+ , IncrementalState_(ScanTypeProperties(Type_, true))
{
MKQL_ENSURE(!stable, "Stable packing is not supported");
}
@@ -894,7 +898,8 @@ TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* typ
template<bool Fast>
TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type)
: Type_(type)
- , State_(ScanTypeProperties(Type_))
+ , State_(ScanTypeProperties(Type_, false))
+ , IncrementalState_(ScanTypeProperties(Type_, true))
{
}
@@ -905,12 +910,10 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const TH
template<bool Fast>
void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const {
- MKQL_ENSURE(Type_->IsList(), "UnpackBatch() requires list type");
-
- auto& s = State_;
+ auto& s = IncrementalState_;
ui64 len;
ui32 topLength;
- const TType* itemType = static_cast<const TListType*>(Type_)->GetItemType();
+ const TType* itemType = Type_;
if constexpr (!Fast) {
auto pair = SkipEmbeddedLength<Fast>(buf);
topLength = pair.first;
@@ -951,20 +954,19 @@ const TPagedBuffer& TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValueP
template<bool Fast>
TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) {
- MKQL_ENSURE(Type_->IsList(), "AddItem() is only supported for list types");
- const TType* itemType = static_cast<const TListType*>(Type_)->GetItemType();
+ const TType* itemType = Type_;
if (!ItemCount_) {
Buffer_.Clear();
if constexpr (Fast) {
// reserve place for list item count
Buffer_.ReserveHeader(sizeof(ItemCount_));
} else {
- State_.OptionalUsageMask.Reset();
+ IncrementalState_.OptionalUsageMask.Reset();
Buffer_.ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE);
}
}
- PackImpl<Fast, false>(itemType, Buffer_, value, State_);
+ PackImpl<Fast, false>(itemType, Buffer_, value, IncrementalState_);
++ItemCount_;
return *this;
}
@@ -977,7 +979,6 @@ void TValuePackerTransport<Fast>::Clear() {
template<bool Fast>
const TPagedBuffer& TValuePackerTransport<Fast>::Finish() {
- MKQL_ENSURE(Type_->IsList(), "Finish() is only supported for list types");
if constexpr (Fast) {
char* dst = Buffer_.Header(sizeof(ItemCount_));
Y_VERIFY_DEBUG(dst);
@@ -1000,7 +1001,7 @@ void TValuePackerTransport<Fast>::BuildMeta(bool addItemCount) const {
const size_t itemCountSize = addItemCount ? GetPack64Length(ItemCount_) : 0;
const size_t packedSize = Buffer_.Size() + itemCountSize;
- auto& s = State_;
+ auto& s = addItemCount ? IncrementalState_ : State_;
const bool useMask = s.Properties.Test(EPackProps::UseOptionalMask);
const size_t maskSize = useMask ? s.OptionalUsageMask.CalcSerializedSize() : 0;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index cc4fabc071d..16c7d423f18 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -72,11 +72,11 @@ class TValuePackerTransport {
public:
using TSelf = TValuePackerTransport<Fast>;
- explicit TValuePackerTransport(const TType *type);
+ explicit TValuePackerTransport(const TType* type);
// for compatibility with TValuePackerGeneric - stable packing is not supported
- TValuePackerTransport(bool stable, const TType *type);
+ TValuePackerTransport(bool stable, const TType* type);
- // incremental packing - works only for List<T> type
+ // AddItem()/UnpackBatch() will perform incremental packing - type T is processed as list item type. Will produce List<T> layout
TSelf& AddItem(const NUdf::TUnboxedValuePod& value);
size_t PackedSizeEstimate() const {
return Buffer_.Size() + Buffer_.ReservedHeaderSize();
@@ -85,8 +85,9 @@ public:
const TPagedBuffer& Finish();
TPagedBuffer FinishAndPull();
+ // Pack()/Unpack() will pack/unpack single value of type T
// reference is valid till the next call to Pack()
- const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod& value) const;
+ const TPagedBuffer& Pack(const NUdf::TUnboxedValuePod &value) const;
NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const;
void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueVector& result) const;
private:
@@ -96,6 +97,7 @@ private:
ui64 ItemCount_ = 0;
mutable TPagedBuffer Buffer_;
mutable NDetails::TPackerState State_;
+ mutable NDetails::TPackerState IncrementalState_;
};
using TValuePacker = TValuePackerGeneric<false>;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
index 0ad599e91d7..1ceb8ec4047 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_ut.cpp
@@ -524,7 +524,8 @@ protected:
if constexpr (Transport) {
auto itemType = PgmBuilder.NewDataType(NUdf::TDataType<char *>::Id);
auto listType = PgmBuilder.NewListType(itemType);
- TValuePackerType packer(false, listType);
+ TValuePackerType packer(false, itemType);
+ TValuePackerType listPacker(false, listType);
TStringBuf str = "01234567890ABCDEF";
@@ -538,7 +539,7 @@ protected:
TString serializedStr;
packer.Finish().CopyTo(serializedStr);
- auto listObj = packer.Unpack(serializedStr, HolderFactory);
+ auto listObj = listPacker.Unpack(serializedStr, HolderFactory);
UNIT_ASSERT_VALUES_EQUAL(listObj.GetListLength(), count);
ui32 i = 0;
const auto iter = listObj.GetListIterator();