aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <aneporada@ydb.tech>2023-04-06 11:04:28 +0300
committeraneporada <aneporada@ydb.tech>2023-04-06 11:04:28 +0300
commitca65d8a1a697b77fb1452a3bb072ac3628fc8af9 (patch)
treec6bc828b6ef7adf553e647eb04da00a394493fd1
parent6c00a004990fdcd29f8a065d0f08c6a17d7999b2 (diff)
downloadydb-ca65d8a1a697b77fb1452a3bb072ac3628fc8af9.tar.gz
Drop unused DATA_TRANSPORT_ARROW_1_0 serialization/transport encoding
initial
-rw-r--r--ydb/library/yql/dq/proto/dq_transport.proto2
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp52
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp53
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h4
-rw-r--r--ydb/library/yql/providers/dq/actors/proto_builder.cpp18
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp20
6 files changed, 5 insertions, 144 deletions
diff --git a/ydb/library/yql/dq/proto/dq_transport.proto b/ydb/library/yql/dq/proto/dq_transport.proto
index 45995f43411..51ef6e33119 100644
--- a/ydb/library/yql/dq/proto/dq_transport.proto
+++ b/ydb/library/yql/dq/proto/dq_transport.proto
@@ -7,7 +7,7 @@ enum EDataTransportVersion {
DATA_TRANSPORT_VERSION_UNSPECIFIED = 0;
DATA_TRANSPORT_YSON_1_0 = 10000;
DATA_TRANSPORT_UV_PICKLE_1_0 = 20000;
- DATA_TRANSPORT_ARROW_1_0 = 30000;
+ DATA_TRANSPORT_VERSION_RESERVED = 30000;
DATA_TRANSPORT_UV_FAST_PICKLE_1_0 = 40000;
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
index dcb84f9610d..db595fb4c24 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
@@ -460,16 +460,6 @@ Y_UNIT_TEST(SingleReadQ) {
TestSingleRead(ctx, true);
}
-Y_UNIT_TEST(SingleReadWithArrow) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestSingleRead(ctx, false);
-}
-
-Y_UNIT_TEST(SingleReadWithArrowQ) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestSingleRead(ctx, true);
-}
-
Y_UNIT_TEST(PartialRead) {
TTestContext ctx;
TestPartialRead(ctx, false);
@@ -480,17 +470,6 @@ Y_UNIT_TEST(PartialReadQ) {
TestPartialRead(ctx, true);
}
-// too heavy messages...
-//Y_UNIT_TEST(PartialReadWithArrow) {
-// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
-// TestPartialRead(ctx, false);
-//}
-//
-//Y_UNIT_TEST(PartialReadWithArrowQ) {
-// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
-// TestPartialRead(ctx, true);
-//}
-
Y_UNIT_TEST(Overflow) {
TTestContext ctx;
TestOverflow(ctx, false);
@@ -501,16 +480,6 @@ Y_UNIT_TEST(OverflowQ) {
TestOverflow(ctx, true);
}
-Y_UNIT_TEST(OverflowWithArrow) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestOverflow(ctx, false);
-}
-
-Y_UNIT_TEST(OverflowWithArrowQ) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestOverflow(ctx, true);
-}
-
Y_UNIT_TEST(PopAll) {
TTestContext ctx;
TestPopAll(ctx, false);
@@ -521,16 +490,6 @@ Y_UNIT_TEST(PopAllQ) {
TestPopAll(ctx, true);
}
-Y_UNIT_TEST(PopAllWithArrow) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestPopAll(ctx, false);
-}
-
-Y_UNIT_TEST(PopAllWithArrowQ) {
- TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- TestPopAll(ctx, true);
-}
-
Y_UNIT_TEST(BigRow) {
TTestContext ctx(NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0, true);
TestBigRow(ctx, false);
@@ -550,20 +509,9 @@ Y_UNIT_TEST(Spill) {
TestSpillWithMockStorage(ctx);
}
-// Fail because arrow serialization has a big overhead
-// Y_UNIT_TEST(SpillWithArrow) {
-// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
-// TestSpillWithMockStorage(ctx);
-// }
-
Y_UNIT_TEST(Overflow) {
TTestContext ctx;
TestOverflowWithMockStorage(ctx);
}
-// Fail because arrow serialization has a big overhead
-// Y_UNIT_TEST(OverflowWithArrow) {
-// TTestContext ctx(NDqProto::DATA_TRANSPORT_ARROW_1_0);
-// TestOverflowWithMockStorage(ctx);
-// }
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index afa70b5f4e1..54306d874ed 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -1,5 +1,4 @@
#include "dq_transport.h"
-#include "dq_arrow_helpers.h"
#include <ydb/library/mkql_proto/mkql_proto.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -19,11 +18,6 @@ using namespace NYql;
namespace {
-NDqProto::TData SerializeBufferArrowV1(TUnboxedValueVector& buffer, const TType* itemType);
-
-void DeserializeBufferArrowV1(const NDqProto::TData& data, const TType* itemType,
- const THolderFactory& holderFactory, TUnboxedValueVector& buffer);
-
template<bool Fast>
NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedValuePod& value) {
using TPacker = std::conditional_t<Fast, TValuePackerFast, TValuePacker>;
@@ -39,12 +33,6 @@ NDqProto::TData SerializeValuePickleV1(const TType* type, const NUdf::TUnboxedVa
return data;
}
-NDqProto::TData SerializeValueArrowV1(const TType* type, const NUdf::TUnboxedValuePod& value) {
- TUnboxedValueVector buffer;
- buffer.push_back(value);
- return SerializeBufferArrowV1(buffer, type);
-}
-
template<bool Fast>
void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value,
const THolderFactory& holderFactory)
@@ -55,14 +43,6 @@ void DeserializeValuePickleV1(const TType* type, const NDqProto::TData& data, NU
value = packer.Unpack(data.GetRaw(), holderFactory);
}
-void DeserializeValueArrowV1(const TType* type, const NDqProto::TData& data, NUdf::TUnboxedValue& value,
- const THolderFactory& holderFactory)
-{
- TUnboxedValueVector buffer;
- DeserializeBufferArrowV1(data, type, holderFactory, buffer);
- value = buffer[0];
-}
-
template<bool Fast>
NDqProto::TData SerializeBufferPickleV1(TUnboxedValueVector& buffer, const TType* itemType,
const TTypeEnvironment& typeEnv, const THolderFactory& holderFactory)
@@ -75,18 +55,6 @@ NDqProto::TData SerializeBufferPickleV1(TUnboxedValueVector& buffer, const TType
return data;
}
-NDqProto::TData SerializeBufferArrowV1(TUnboxedValueVector& buffer, const TType* itemType) {
- auto array = NArrow::MakeArray(buffer, itemType);
-
- auto serialized = NArrow::SerializeArray(array);
-
- NDqProto::TData data;
- data.SetTransportVersion(NDqProto::DATA_TRANSPORT_ARROW_1_0);
- data.SetRaw(serialized.data(), serialized.size());
- data.SetRows(buffer.size());
- return data;
-}
-
template<bool Fast>
void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemType, const TTypeEnvironment& typeEnv,
const THolderFactory& holderFactory, TUnboxedValueVector& buffer)
@@ -102,19 +70,6 @@ void DeserializeBufferPickleV1(const NDqProto::TData& data, const TType* itemTyp
}
}
-void DeserializeBufferArrowV1(const NDqProto::TData& data, const TType* itemType, const THolderFactory& holderFactory,
- TUnboxedValueVector& buffer)
-{
- YQL_ENSURE(data.GetTransportVersion() == (ui32) NDqProto::DATA_TRANSPORT_ARROW_1_0);
-
- auto array = NArrow::DeserializeArray(data.GetRaw(), NArrow::GetArrowType(itemType));
- YQL_ENSURE(array->length() == data.GetRows());
- auto newElements = NArrow::ExtractUnboxedValues(array, itemType, holderFactory);
- for (NUdf::TUnboxedValue item: newElements) {
- buffer.emplace_back(std::move(item));
- }
-}
-
void DeserializeParamV1(const NDqProto::TData& data, const TType* type, const THolderFactory& holderFactory,
NUdf::TUnboxedValue& value)
{
@@ -134,8 +89,6 @@ NDqProto::TData TDqDataSerializer::Serialize(const NUdf::TUnboxedValue& value, c
return SerializeValuePickleV1<false>(itemType, value);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
return SerializeValuePickleV1<true>(itemType, value);
- case NDqProto::DATA_TRANSPORT_ARROW_1_0:
- return SerializeValueArrowV1(itemType, value);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
@@ -148,8 +101,6 @@ NDqProto::TData TDqDataSerializer::Serialize(TUnboxedValueVector& buffer, const
return SerializeBufferPickleV1<false>(buffer, itemType, TypeEnv, HolderFactory);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
return SerializeBufferPickleV1<true>(buffer, itemType, TypeEnv, HolderFactory);
- case NDqProto::DATA_TRANSPORT_ARROW_1_0:
- return SerializeBufferArrowV1(buffer, itemType);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
@@ -164,8 +115,6 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it
return DeserializeBufferPickleV1<false>(data, itemType, TypeEnv, HolderFactory, buffer);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
return DeserializeBufferPickleV1<true>(data, itemType, TypeEnv, HolderFactory, buffer);
- case NDqProto::DATA_TRANSPORT_ARROW_1_0:
- return DeserializeBufferArrowV1(data, itemType, HolderFactory, buffer);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
@@ -180,8 +129,6 @@ void TDqDataSerializer::Deserialize(const NDqProto::TData& data, const TType* it
return DeserializeValuePickleV1<false>(itemType, data, value, HolderFactory);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
return DeserializeValuePickleV1<true>(itemType, data, value, HolderFactory);
- case NDqProto::DATA_TRANSPORT_ARROW_1_0:
- DeserializeValueArrowV1(itemType, data, value, HolderFactory);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index a815970b9f6..c024dccff70 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -40,10 +40,6 @@ public:
data.SetRows(count);
return data;
}
- case NDqProto::DATA_TRANSPORT_ARROW_1_0: {
- NKikimr::NMiniKQL::TUnboxedValueVector buffer(first, last);
- return Serialize(buffer, itemType);
- }
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
index d4a3ae90714..cafacb757c1 100644
--- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp
+++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
@@ -40,20 +40,6 @@ TVector<ui32> BuildColumnOrder(const TVector<TString>& columns, NKikimr::NMiniKQ
return columnOrder;
}
-NDqProto::EDataTransportVersion GetTransportVersion(const NYql::NDqProto::TData& data) {
- switch (data.GetTransportVersion()) {
- case 10000:
- return NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0;
- case 20000:
- return NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
- case 30000:
- return NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0;
- default:
- break;
- }
- return NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
-}
-
} // unnamed
TProtoBuilder::TProtoBuilder(const TString& type, const TVector<TString>& columns)
@@ -107,7 +93,7 @@ bool TProtoBuilder::WriteData(const NDqProto::TData& data, const std::function<b
TMemoryUsageInfo memInfo("ProtoBuilder");
THolderFactory holderFactory(Alloc.Ref(), memInfo);
- NDqProto::EDataTransportVersion transportVersion = GetTransportVersion(data);
+ const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion);
TUnboxedValueVector buffer;
@@ -126,7 +112,7 @@ bool TProtoBuilder::WriteData(const TVector<NDqProto::TData>& rows, const std::f
TMemoryUsageInfo memInfo("ProtoBuilder");
THolderFactory holderFactory(Alloc.Ref(), memInfo);
- const auto transportVersion = rows.empty() ? NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED : GetTransportVersion(rows.front());
+ const auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
NDq::TDqDataSerializer dataSerializer(TypeEnv, holderFactory, transportVersion);
for (const auto& part : rows) {
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index 97ca943186d..18f04fe40a7 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -367,24 +367,8 @@ public:
request.Load(&input);
auto guard = Runner->BindAllocator(0); // Explicitly reset memory limit
- auto transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
- switch (request.GetData().GetTransportVersion()) {
- case 10000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_YSON_1_0;
- break;
- }
- case 20000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
- break;
- }
- case 30000: {
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_ARROW_1_0;
- break;
- }
- default:
- transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED;
- }
- NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(), transportVersion);
+ NDq::TDqDataSerializer dataSerializer(Runner->GetTypeEnv(), Runner->GetHolderFactory(),
+ NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED);
NKikimr::NMiniKQL::TUnboxedValueVector buffer;
buffer.reserve(request.GetData().GetRows());
if (request.GetString().empty() && request.GetChunks() == 0) {