diff options
author | aneporada <aneporada@ydb.tech> | 2023-04-06 11:04:28 +0300 |
---|---|---|
committer | aneporada <aneporada@ydb.tech> | 2023-04-06 11:04:28 +0300 |
commit | ca65d8a1a697b77fb1452a3bb072ac3628fc8af9 (patch) | |
tree | c6bc828b6ef7adf553e647eb04da00a394493fd1 | |
parent | 6c00a004990fdcd29f8a065d0f08c6a17d7999b2 (diff) | |
download | ydb-ca65d8a1a697b77fb1452a3bb072ac3628fc8af9.tar.gz |
Drop unused DATA_TRANSPORT_ARROW_1_0 serialization/transport encoding
initial
-rw-r--r-- | ydb/library/yql/dq/proto/dq_transport.proto | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp | 52 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_transport.cpp | 53 | ||||
-rw-r--r-- | ydb/library/yql/dq/runtime/dq_transport.h | 4 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/actors/proto_builder.cpp | 18 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/runtime/task_command_executor.cpp | 20 |
6 files changed, 5 insertions, 144 deletions
diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto index 45995f43411..51ef6e33119 100644 --- a/ydb/library/yql/dq/proto/dq_transport.proto +++ b/ydb/library/yql/dq/proto/dq_transport.proto @@ -7,7 +7,7 @@ enum EDataTransportVersion { DATA_TRANSPORT_VERSION_UNSPECIFIED = 0; DATA_TRANSPORT_YSON_1_0 = 10000; DATA_TRANSPORT_UV_PICKLE_1_0 = 20000; - DATA_TRANSPORT_ARROW_1_0 = 30000; + DATA_TRANSPORT_VERSION_RESERVED = 30000; DATA_TRANSPORT_UV_FAST_PICKLE_1_0 = 40000; } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp index dcb84f9610d..db595fb4c24 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp @@ -460,16 +460,6 @@ Y_UNIT_TEST(SingleReadQ) { TestSingleRead(ctx, true); } -Y_UNIT_TEST(SingleReadWithArrow) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestSingleRead(ctx, false); -} - -Y_UNIT_TEST(SingleReadWithArrowQ) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestSingleRead(ctx, true); -} - Y_UNIT_TEST(PartialRead) { TTestContext ctx; TestPartialRead(ctx, false); @@ -480,17 +470,6 @@ Y_UNIT_TEST(PartialReadQ) { TestPartialRead(ctx, true); } -// too heavy messages... -//Y_UNIT_TEST(PartialReadWithArrow) { -// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); -// TestPartialRead(ctx, false); -//} -// -//Y_UNIT_TEST(PartialReadWithArrowQ) { -// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); -// TestPartialRead(ctx, true); -//} - Y_UNIT_TEST(Overflow) { TTestContext ctx; TestOverflow(ctx, false); @@ -501,16 +480,6 @@ Y_UNIT_TEST(OverflowQ) { TestOverflow(ctx, true); } -Y_UNIT_TEST(OverflowWithArrow) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestOverflow(ctx, false); -} - -Y_UNIT_TEST(OverflowWithArrowQ) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestOverflow(ctx, true); -} - Y_UNIT_TEST(PopAll) { TTestContext ctx; TestPopAll(ctx, false); @@ -521,16 +490,6 @@ Y_UNIT_TEST(PopAllQ) { TestPopAll(ctx, true); } -Y_UNIT_TEST(PopAllWithArrow) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestPopAll(ctx, false); -} - -Y_UNIT_TEST(PopAllWithArrowQ) { - TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); - TestPopAll(ctx, true); -} - Y_UNIT_TEST(BigRow) { TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true); TestBigRow(ctx, false); @@ -550,20 +509,9 @@ Y_UNIT_TEST(Spill) { TestSpillWithMockStorage(ctx); } -// Fail because arrow serialization has a big overhead -// Y_UNIT_TEST(SpillWithArrow) { -// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); -// TestSpillWithMockStorage(ctx); -// } - Y_UNIT_TEST(Overflow) { TTestContext ctx; TestOverflowWithMockStorage(ctx); } -// Fail because arrow serialization has a big overhead -// Y_UNIT_TEST(OverflowWithArrow) { -// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0); -// TestOverflowWithMockStorage(ctx); -// } } diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index afa70b5f4e1..54306d874ed 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -1,5 +1,4 @@ #include "dq_transport.h" -#include "dq_arrow_helpers.h" #include <ydb/library/mkql_proto/mkql_proto.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> @@ -19,11 +18,6 @@ using namespace NYql; namespace { -NDqProto::TData SerializeBufferArrowV1(TUnboxedValueVector& buffer, const TType* itemType); - -void DeserializeBufferArrowV1(const NDqProto::TData& data, const TType* itemType, - const THolderFactory& holderFactory, TUnboxedValueVector& buffer); - template<bool Fast> NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedValuePod& value) { using TPacker = std::conditional_t<Fast, TValuePackerFast, TValuePacker>; @@ -39,12 +33,6 @@ NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedVa return data; } -NDqProto::TData SerializeValueArrowV1(const TType* type, const NUdf::TUnboxedValuePod& value) { - TUnboxedValueVector buffer; - buffer.push_back(value); - return SerializeBufferArrowV1(buffer, type); -} - template<bool Fast> void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value, const THolderFactory& holderFactory) @@ -55,14 +43,6 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU value = packer.Unpack(data.GetRaw(), holderFactory); } -void DeserializeValueArrowV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value, - const THolderFactory& holderFactory) -{ - TUnboxedValueVector buffer; - DeserializeBufferArrowV1(data, type, holderFactory, buffer); - value = buffer[0]; -} - template<bool Fast> NDqProto::TData SerializeBufferPickleV1(TUnboxedValueVector& buffer, const TType* itemType, const TTypeEnvironment& typeEnv, const THolderFactory& holderFactory) @@ -75,18 +55,6 @@ NDqProto::TData SerializeBufferPickleV1(TUnboxedValueVector& buffer, const TType return data; } -NDqProto::TData SerializeBufferArrowV1(TUnboxedValueVector& buffer, const TType* itemType) { - auto array = NArrow::MakeArray(buffer, itemType); - - auto serialized = NArrow::SerializeArray(array); - - NDqProto::TData data; - data.SetTransportVersion(NDqProto::DATA_TRANSPORT_ARROW_1_0); - data.SetRaw(serialized.data(), serialized.size()); - data.SetRows(buffer.size()); - return data; -} - template<bool Fast> void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, const TTypeEnvironment& typeEnv, const THolderFactory& holderFactory, TUnboxedValueVector& buffer) @@ -102,19 +70,6 @@ void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemTyp } } -void DeserializeBufferArrowV1(const NDqProto::TData& data, const TType* itemType, const THolderFactory& holderFactory, - TUnboxedValueVector& buffer) -{ - YQL_ENSURE(data.GetTransportVersion() == (ui32) NDqProto::DATA_TRANSPORT_ARROW_1_0); - - auto array = NArrow::DeserializeArray(data.GetRaw(), NArrow::GetArrowType(itemType)); - YQL_ENSURE(array->length() == data.GetRows()); - auto newElements = NArrow::ExtractUnboxedValues(array, itemType, holderFactory); - for (NUdf::TUnboxedValue item: newElements) { - buffer.emplace_back(std::move(item)); - } -} - void DeserializeParamV1(const NDqProto::TData& data, const TType* type, const THolderFactory& holderFactory, NUdf::TUnboxedValue& value) { @@ -134,8 +89,6 @@ NDqProto::TData TDqDataSerializer::Serialize(const NUdf::TUnboxedValue& value, c return SerializeValuePickleV1<false>(itemType, value); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: return SerializeValuePickleV1<true>(itemType, value); - case NDqProto::DATA_TRANSPORT_ARROW_1_0: - return SerializeValueArrowV1(itemType, value); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } @@ -148,8 +101,6 @@ NDqProto::TData TDqDataSerializer::Serialize(TUnboxedValueVector& buffer, const return SerializeBufferPickleV1<false>(buffer, itemType, TypeEnv, HolderFactory); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: return SerializeBufferPickleV1<true>(buffer, itemType, TypeEnv, HolderFactory); - case NDqProto::DATA_TRANSPORT_ARROW_1_0: - return SerializeBufferArrowV1(buffer, itemType); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } @@ -164,8 +115,6 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it return DeserializeBufferPickleV1<false>(data, itemType, TypeEnv, HolderFactory, buffer); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: return DeserializeBufferPickleV1<true>(data, itemType, TypeEnv, HolderFactory, buffer); - case NDqProto::DATA_TRANSPORT_ARROW_1_0: - return DeserializeBufferArrowV1(data, itemType, HolderFactory, buffer); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } @@ -180,8 +129,6 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it return DeserializeValuePickleV1<false>(itemType, data, value, HolderFactory); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: return DeserializeValuePickleV1<true>(itemType, data, value, HolderFactory); - case NDqProto::DATA_TRANSPORT_ARROW_1_0: - DeserializeValueArrowV1(itemType, data, value, HolderFactory); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index a815970b9f6..c024dccff70 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -40,10 +40,6 @@ public: data.SetRows(count); return data; } - case NDqProto::DATA_TRANSPORT_ARROW_1_0: { - NKikimr::NMiniKQL::TUnboxedValueVector buffer(first, last); - return Serialize(buffer, itemType); - } default: YQL_ENSURE(false, "Unsupported TransportVersion"); } diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp index d4a3ae90714..cafacb757c1 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp +++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp @@ -40,20 +40,6 @@ TVector<ui32> BuildColumnOrder(const TVector<TString>& columns, NKikimr::NMiniKQ return columnOrder; } -NDqProto::EDataTransportVersion GetTransportVersion(const NYql::NDqProto::TData& data) { - switch (data.GetTransportVersion()) { - case 10000: - return NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0; - case 20000: - return NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; - case 30000: - return NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0; - default: - break; - } - return NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; -} - } // unnamed TProtoBuilder::TProtoBuilder(const TString& type, const TVector<TString>& columns) @@ -107,7 +93,7 @@ bool TProtoBuilder::WriteData(const NDqProto::TData& data, const std::function<b TMemoryUsageInfo memInfo("ProtoBuilder"); THolderFactory holderFactory(Alloc.Ref(), memInfo); - NDqProto::EDataTransportVersion transportVersion = GetTransportVersion(data); + const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion); TUnboxedValueVector buffer; @@ -126,7 +112,7 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f TMemoryUsageInfo memInfo("ProtoBuilder"); THolderFactory holderFactory(Alloc.Ref(), memInfo); - const auto transportVersion = rows.empty() ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED : GetTransportVersion(rows.front()); + const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion); for (const auto& part : rows) { diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 97ca943186d..18f04fe40a7 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -367,24 +367,8 @@ public: request.Load(&input); auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit - auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; - switch (request.GetData().GetTransportVersion()) { - case 10000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0; - break; - } - case 20000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0; - break; - } - case 30000: { - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0; - break; - } - default: - transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED; - } - NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(), transportVersion); + NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(), + NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED); NKikimr::NMiniKQL::TUnboxedValueVector buffer; buffer.reserve(request.GetData().GetRows()); if (request.GetString().empty() && request.GetChunks() == 0) { |