summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraneporada <[email protected]>2023-07-13 12:54:02 +0300
committeraneporada <[email protected]>2023-07-13 12:54:02 +0300
commit6b539b3b1dc62f263202e7568e3532fa4c6f8db8 (patch)
tree25c2212447550d31b36c9bb2176dcff5dafc6d38
parent3e821290327605ba1c4ad4a9cf2ce3b19acb8139 (diff)
Pass data for deserealization by rvalue reference
This way we can: 1) reduce memory usage - serialized data chunks can be freed immediately after being read 2) support zero-copy deserialisation: resulting unboxed values can hold and own references to serialised memory regions (this is especially helpful for arrow blocks)
-rw-r--r--ydb/core/fq/libs/actors/result_writer.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp8
-rw-r--r--ydb/core/kqp/executer_actor/kqp_literal_executer.cpp3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_result_channel.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.cpp4
-rw-r--r--ydb/core/kqp/runtime/kqp_transport.h2
-rw-r--r--ydb/library/yql/dq/common/dq_serialized_batch.h11
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp7
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp33
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.cpp24
-rw-r--r--ydb/library/yql/dq/runtime/dq_transport.h4
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp12
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack.h2
-rw-r--r--ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h21
-rw-r--r--ydb/library/yql/providers/dq/actors/full_result_writer.cpp3
-rw-r--r--ydb/library/yql/providers/dq/actors/proto_builder.cpp22
-rw-r--r--ydb/library/yql/providers/dq/actors/proto_builder.h10
-rw-r--r--ydb/library/yql/providers/dq/actors/result_actor_base.h3
-rw-r--r--ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp2
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp2
23 files changed, 94 insertions, 104 deletions
diff --git a/ydb/core/fq/libs/actors/result_writer.cpp b/ydb/core/fq/libs/actors/result_writer.cpp
index 846cb2bdf6e..51cc08be9d6 100644
--- a/ydb/core/fq/libs/actors/result_writer.cpp
+++ b/ydb/core/fq/libs/actors/result_writer.cpp
@@ -111,7 +111,8 @@ private:
Finished = true;
NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record);
- *queryResult.MutableYson() = ResultBuilder->BuildYson(Head);
+ *queryResult.MutableYson() = ResultBuilder->BuildYson(std::move(Head));
+ Head.clear();
if (!Issues.Empty()) {
IssuesToMessage(Issues, queryResult.MutableIssues());
}
@@ -271,8 +272,7 @@ private:
return;
}
- TVector<NDq::TDqSerializedBatch> batch(1);
- NDq::TDqSerializedBatch& data = batch.front();
+ NDq::TDqSerializedBatch data;
data.Proto = std::move(*ev->Get()->Record.MutableChannelData()->MutableData());
if (data.Proto.HasPayloadId()) {
data.Payload = ev->Get()->GetPayload(data.Proto.GetPayloadId());
@@ -280,7 +280,7 @@ private:
FreeSpace -= data.Size();
OccupiedSpace += data.Size();
- auto resultSet = ResultBuilder->BuildResultSet(batch);
+ auto resultSet = ResultBuilder->BuildResultSet({ data });
if (OccupiedSpace > ResultBytesLimit) {
TIssues issues;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 5d6879e69ce..1cc61beaf86 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -37,8 +37,8 @@ struct TEvKqpExecuter {
TVector<TKqpPhyTxHolder::TConstPtr>& GetTxHolders() { return TxHolders; }
TVector<TKqpExecuterTxResult>& GetTxResults() { return TxResults; }
void InitTxResult(const TKqpPhyTxHolder::TConstPtr& tx);
- void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows);
- void TakeResult(ui32 idx, const NYql::NDq::TDqSerializedBatch& rows);
+ void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector&& rows);
+ void TakeResult(ui32 idx, NYql::NDq::TDqSerializedBatch&& rows);
ui64 GetResultRowsCount() const {
return ResultRowsCount;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 18265decf21..5be1366333c 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -28,7 +28,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt
}
}
-void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, const NDq::TDqSerializedBatch& rows) {
+void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch&& rows) {
YQL_ENSURE(idx < TxResults.size());
ResultRowsCount += rows.RowCount();
ResultRowsBytes += rows.Size();
@@ -38,7 +38,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, const NDq::TDqSerialize
NDq::TDqDataSerializer dataSerializer(
AllocState->TypeEnv, AllocState->HolderFactory,
static_cast<NDqProto::EDataTransportVersion>(rows.Proto.GetTransportVersion()));
- dataSerializer.Deserialize(rows, result.MkqlItemType, result.Rows);
+ dataSerializer.Deserialize(std::move(rows), result.MkqlItemType, result.Rows);
}
}
@@ -50,7 +50,7 @@ TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() {
}
}
-void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows) {
+void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector&& rows) {
YQL_ENSURE(idx < TxResults.size());
ResultRowsCount += rows.size();
auto& txResult = TxResults[idx];
@@ -63,7 +63,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb
emptyVector.swap(rows);
}
- serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows);
+ serializer.Deserialize(std::move(buffer), txResult.MkqlItemType, txResult.Rows);
}
TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken,
diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
index 5180ce9ea84..c24bf3ec7a9 100644
--- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp
@@ -244,7 +244,8 @@ public:
auto& channelDesc = TasksGraph.GetChannel(outputChannelId);
NYql::NDq::TDqSerializedBatch outputData;
while (outputChannel->Pop(outputData)) {
- ResponseEv->TakeResult(channelDesc.DstInputIndex, outputData);
+ ResponseEv->TakeResult(channelDesc.DstInputIndex, std::move(outputData));
+ outputData = {};
}
YQL_ENSURE(outputChannel->IsFinished());
}
diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
index 0ffcf1d7c66..2da9881c51a 100644
--- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp
@@ -157,7 +157,7 @@ private:
batch.Payload = std::move(computeData.Payload);
TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry};
- auto resultSet = protoBuilder.BuildYdbResultSet(batches, ItemType, ColumnOrder);
+ auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder);
auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>();
streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo());
@@ -193,7 +193,7 @@ private:
auto channelId = computeData.Proto.GetChannelData().GetChannelId();
- ResultReceiver->TakeResult(InputIndex, batch);
+ ResultReceiver->TakeResult(InputIndex, std::move(batch));
auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>();
diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp
index 8669278912c..fa247f4ff54 100644
--- a/ydb/core/kqp/runtime/kqp_transport.cpp
+++ b/ydb/core/kqp/runtime/kqp_transport.cpp
@@ -47,7 +47,7 @@ TKqpProtoBuilder::~TKqpProtoBuilder() {
}
Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
- const TVector<NYql::NDq::TDqSerializedBatch>& data,
+ TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* mkqlSrcRowType,
const TVector<ui32>* columnOrder)
{
@@ -76,7 +76,7 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet(
for (auto& part : data) {
if (part.RowCount()) {
TUnboxedValueBatch rows(mkqlSrcRowType);
- dataSerializer.Deserialize(part, mkqlSrcRowType, rows);
+ dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows);
rows.ForEachRow([&](const NUdf::TUnboxedValue& value) {
ExportValueToProto(mkqlSrcRowType, value, *resultSet.add_rows(), columnOrder);
});
diff --git a/ydb/core/kqp/runtime/kqp_transport.h b/ydb/core/kqp/runtime/kqp_transport.h
index 7c9e76d3147..77feca0b0bd 100644
--- a/ydb/core/kqp/runtime/kqp_transport.h
+++ b/ydb/core/kqp/runtime/kqp_transport.h
@@ -20,7 +20,7 @@ public:
~TKqpProtoBuilder();
- Ydb::ResultSet BuildYdbResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data,
+ Ydb::ResultSet BuildYdbResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data,
NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr);
private:
diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h
index 8423dd6c595..17379060eec 100644
--- a/ydb/library/yql/dq/common/dq_serialized_batch.h
+++ b/ydb/library/yql/dq/common/dq_serialized_batch.h
@@ -39,6 +39,17 @@ struct TDqSerializedBatch {
}
void SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer);
+
+ TRope PullPayload() {
+ TRope result;
+ if (IsOOB()) {
+ result = std::move(Payload);
+ } else {
+ result = TRope(std::move(*Proto.MutableRaw()));
+ }
+ Clear();
+ return result;
+ }
};
TRope SaveForSpilling(TDqSerializedBatch&& batch);
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index f649d9e93a1..9f6bc5965bb 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -12,17 +12,18 @@ private:
void PushImpl(TDqSerializedBatch&& data) {
const i64 space = data.Size();
+ const size_t rowCount = data.RowCount();
NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType);
if (Y_UNLIKELY(ProfileStats)) {
auto startTime = TInstant::Now();
- DataSerializer.Deserialize(data, InputType, batch);
+ DataSerializer.Deserialize(std::move(data), InputType, batch);
ProfileStats->DeserializationTime += (TInstant::Now() - startTime);
} else {
- DataSerializer.Deserialize(data, InputType, batch);
+ DataSerializer.Deserialize(std::move(data), InputType, batch);
}
- YQL_ENSURE(batch.RowCount() == data.RowCount());
+ YQL_ENSURE(batch.RowCount() == rowCount);
AddBatch(std::move(batch), space);
}
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 07744232427..f925b261155 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -278,12 +278,7 @@ public:
if (!this->Pop(chunk)) {
break;
}
- if (IsOOBTransport(TransportVersion)) {
- Packer.UnpackBatch(std::move(chunk.Payload), HolderFactory, rows);
- } else {
- TStringBuf buf(chunk.Proto.GetRaw());
- Packer.UnpackBatch(buf, HolderFactory, rows);
- }
+ Packer.UnpackBatch(chunk.PullPayload(), HolderFactory, rows);
}
if (OutputType->IsMulti()) {
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
index 3cd24aeaee8..f58e9103b5b 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp
@@ -200,7 +200,7 @@ void TestSingleRead(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsOut);
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
ValidateBatch(ctx, buffer, 0, 10);
data.Clear();
@@ -234,19 +234,20 @@ void TestPartialRead(TTestContext& ctx) {
while (readRows < 9) {
TDqSerializedBatch data;
UNIT_ASSERT(ch->Pop(data));
+ const auto rowCount = data.RowCount();
ui32 v = expected[req];
++req;
- UNIT_ASSERT_VALUES_EQUAL(v, data.RowCount());
+ UNIT_ASSERT_VALUES_EQUAL(v, rowCount);
UNIT_ASSERT_VALUES_EQUAL(++readChunks, ch->GetStats()->Chunks);
UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn);
- UNIT_ASSERT_VALUES_EQUAL(readRows + data.RowCount(), ch->GetStats()->RowsOut);
+ UNIT_ASSERT_VALUES_EQUAL(readRows + rowCount, ch->GetStats()->RowsOut);
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
- ValidateBatch(ctx, buffer, readRows, data.RowCount());
- readRows += data.RowCount();
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, readRows, rowCount);
+ readRows += rowCount;
}
TDqSerializedBatch data;
@@ -306,7 +307,7 @@ void TestPopAll(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(50, data.RowCount());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
ValidateBatch(ctx, buffer, 0, 50);
data.Clear();
UNIT_ASSERT(!ch->Pop(data));
@@ -348,7 +349,7 @@ void TestBigRow(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut);
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
UNIT_ASSERT_VALUES_EQUAL(2, buffer.RowCount());
ui32 i = 1;
@@ -380,7 +381,7 @@ void TestBigRow(TTestContext& ctx) {
UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut);
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
UNIT_ASSERT_VALUES_EQUAL(1, buffer.RowCount());
@@ -428,10 +429,11 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
TDqSerializedBatch data;
while (ch->Pop(data)) {
+ const auto rowCount = data.RowCount();
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
- ValidateBatch(ctx, buffer, loadedRows, data.RowCount());
- loadedRows += data.RowCount();
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, loadedRows, rowCount);
+ loadedRows += rowCount;
}
UNIT_ASSERT_VALUES_EQUAL(35, loadedRows);
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount());
@@ -450,10 +452,11 @@ void TestSpillWithMockStorage(TTestContext& ctx) {
TDqSerializedBatch data;
while (ch->Pop(data)) {
+ const auto rowCount = data.RowCount();
TUnboxedValueBatch buffer(ctx.GetOutputType());
- ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer);
- ValidateBatch(ctx, buffer, loadedRows + 100, data.RowCount());
- loadedRows += data.RowCount();
+ ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer);
+ ValidateBatch(ctx, buffer, loadedRows + 100, rowCount);
+ loadedRows += rowCount;
}
UNIT_ASSERT_VALUES_EQUAL(5, loadedRows);
UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount());
diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp
index c44a26ba5e9..f2fcf48dc65 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.cpp
+++ b/ydb/library/yql/dq/runtime/dq_transport.cpp
@@ -91,16 +91,16 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons
return result;
}
-template<bool Fast, typename TInputBuf>
-void DeserializeValue(const TType* type, TInputBuf&& data, const THolderFactory& holderFactory, NUdf::TUnboxedValue& value)
+template<bool Fast>
+void DeserializeValue(const TType* type, TRope&& data, const THolderFactory& holderFactory, NUdf::TUnboxedValue& value)
{
using TPacker = TValuePackerTransport<Fast>;
TPacker packer(/* stable */ false, type);
value = packer.Unpack(std::move(data), holderFactory);
}
-template<bool Fast, typename TInputBuf>
-void DeserializeBuffer(const TType* itemType, TInputBuf&& data, const THolderFactory& holderFactory, TUnboxedValueBatch& buffer)
+template<bool Fast>
+void DeserializeBuffer(const TType* itemType, TRope&& data, const THolderFactory& holderFactory, TUnboxedValueBatch& buffer)
{
using TPacker = TValuePackerTransport<Fast>;
TPacker packer(/* stable */ false, itemType);
@@ -125,37 +125,33 @@ TDqSerializedBatch TDqDataSerializer::Serialize(const NKikimr::NMiniKQL::TUnboxe
return SerializeBuffer(TransportVersion, itemType, buffer);
}
-void TDqDataSerializer::Deserialize(const TDqSerializedBatch& data, const TType* itemType, TUnboxedValueBatch& buffer) const
+void TDqDataSerializer::Deserialize(TDqSerializedBatch&& data, const TType* itemType, TUnboxedValueBatch& buffer) const
{
auto guard = TypeEnv.BindAllocator();
switch (data.Proto.GetTransportVersion()) {
case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0:
- return DeserializeBuffer<false, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, buffer);
case NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0:
- return DeserializeBuffer<false, TRope>(itemType, TRope(data.Payload), HolderFactory, buffer);
+ return DeserializeBuffer<false>(itemType, data.PullPayload(), HolderFactory, buffer);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
- return DeserializeBuffer<true, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, buffer);
case NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0:
- return DeserializeBuffer<true, TRope>(itemType, TRope(data.Payload), HolderFactory, buffer);
+ return DeserializeBuffer<true>(itemType, data.PullPayload(), HolderFactory, buffer);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
}
-void TDqDataSerializer::Deserialize(const TDqSerializedBatch& data, const TType* itemType, NUdf::TUnboxedValue& value) const
+void TDqDataSerializer::Deserialize(TDqSerializedBatch&& data, const TType* itemType, NUdf::TUnboxedValue& value) const
{
auto guard = TypeEnv.BindAllocator();
switch (data.Proto.GetTransportVersion()) {
case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED:
case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0:
- return DeserializeValue<false, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, value);
case NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0:
- return DeserializeValue<false, TRope>(itemType, TRope(data.Payload), HolderFactory, value);
+ return DeserializeValue<false>(itemType, data.PullPayload(), HolderFactory, value);
case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
- return DeserializeValue<true, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, value);
case NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0:
- return DeserializeValue<true, TRope>(itemType, TRope(data.Payload), HolderFactory, value);
+ return DeserializeValue<true>(itemType, data.PullPayload(), HolderFactory, value);
default:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h
index f6c0eff8eb3..656ca49fb1e 100644
--- a/ydb/library/yql/dq/runtime/dq_transport.h
+++ b/ydb/library/yql/dq/runtime/dq_transport.h
@@ -46,9 +46,9 @@ public:
YQL_ENSURE(false, "Unsupported TransportVersion");
}
- void Deserialize(const TDqSerializedBatch& data, const NKikimr::NMiniKQL::TType* itemType,
+ void Deserialize(TDqSerializedBatch&& data, const NKikimr::NMiniKQL::TType* itemType,
NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) const;
- void Deserialize(const TDqSerializedBatch& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const;
+ void Deserialize(TDqSerializedBatch&& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const;
struct TEstimateSizeSettings {
bool WithHeaders;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
index 250982b3b20..6bcb017b0a9 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp
@@ -954,12 +954,6 @@ TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type)
}
template<bool Fast>
-NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const THolderFactory& holderFactory) const {
- TChunkedInputBuffer chunked(buf);
- return DoUnpack<Fast>(Type_, chunked, buf.size(), holderFactory, State_);
-}
-
-template<bool Fast>
NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THolderFactory& holderFactory) const {
const size_t totalSize = buf.GetSize();
TChunkedInputBuffer chunked(std::move(buf));
@@ -967,12 +961,6 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THold
}
template<bool Fast>
-void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
- TChunkedInputBuffer chunked(buf);
- DoUnpackBatch<Fast>(Type_, chunked, buf.size(), holderFactory, IncrementalState_, result);
-}
-
-template<bool Fast>
void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
const size_t totalSize = buf.GetSize();
TChunkedInputBuffer chunked(std::move(buf));
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
index 8fcaaecab97..03255a09ba5 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h
@@ -88,9 +88,7 @@ public:
// Pack()/Unpack() will pack/unpack single value of type T
TPagedBuffer::TPtr Pack(const NUdf::TUnboxedValuePod& value) const;
- NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const;
NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const;
- void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const;
private:
void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const;
diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
index 7881af883aa..60d9597252f 100644
--- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
+++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h
@@ -62,14 +62,12 @@ class TChunkedInputBuffer : private TNonCopyable {
public:
explicit TChunkedInputBuffer(TRope&& rope)
: Rope_(std::move(rope))
- , It_(Rope_.begin())
{
Next();
}
explicit TChunkedInputBuffer(TStringBuf input)
: Rope_(TRope{})
- , It_(Rope_.begin())
, Data_(input.data())
, Len_(input.size())
{
@@ -112,15 +110,14 @@ public:
void Next() {
Y_VERIFY_DEBUG(Len_ == 0);
- Len_ = 0;
- Data_ = nullptr;
- while (It_ != Rope_.end()) {
- Len_ = It_.ContiguousSize();
- Data_ = It_.ContiguousData();
- ++It_;
- if (Len_) {
- break;
- }
+ Rope_.EraseFront(OriginalLen_);
+ if (!Rope_.IsEmpty()) {
+ Len_ = OriginalLen_ = Rope_.begin().ContiguousSize();
+ Data_ = Rope_.begin().ContiguousData();
+ Y_VERIFY_DEBUG(Len_ > 0);
+ } else {
+ Len_ = OriginalLen_ = 0;
+ Data_ = nullptr;
}
}
@@ -140,9 +137,9 @@ private:
}
TRope Rope_;
- TRope::TConstIterator It_;
const char* Data_ = nullptr;
size_t Len_ = 0;
+ size_t OriginalLen_ = 0;
};
template <typename T>
diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
index 70280927879..e369d37671a 100644
--- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
+++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp
@@ -137,8 +137,7 @@ private:
try {
TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; });
- NDq::TDqSerializedBatch data = request.PullSerializedBatch();
- ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) {
+ ResultBuilder->WriteData(request.PullSerializedBatch(), [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) {
writer->AddRow(value);
return true;
});
diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
index 55fd654dbb0..9a236f6edd3 100644
--- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp
+++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp
@@ -59,13 +59,13 @@ bool TProtoBuilder::CanBuildResultSet() const {
return ResultType->GetKind() == TType::EKind::Struct;
}
-TString TProtoBuilder::BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& rows, ui64 maxBytesLimit) {
+TString TProtoBuilder::BuildYson(TVector<NYql::NDq::TDqSerializedBatch>&& rows, ui64 maxBytesLimit) {
ui64 size = 0;
TStringStream out;
NYson::TYsonWriter writer((IOutputStream*)&out);
writer.OnBeginList();
- auto full = WriteData(rows, [&](const NYql::NUdf::TUnboxedValuePod& value) {
+ auto full = WriteData(std::move(rows), [&](const NYql::NUdf::TUnboxedValuePod& value) {
auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder);
writer.OnListItem();
writer.OnRaw(rowYson);
@@ -81,14 +81,14 @@ TString TProtoBuilder::BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& r
return out.Str();
}
-bool TProtoBuilder::WriteYsonData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const TString& rawYson)>& func) {
- return WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) {
+bool TProtoBuilder::WriteYsonData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const TString& rawYson)>& func) {
+ return WriteData(std::move(data), [&](const NYql::NUdf::TUnboxedValuePod& value) {
auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder);
return func(rowYson);
});
}
-bool TProtoBuilder::WriteData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) {
+bool TProtoBuilder::WriteData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) {
TGuard<TScopedAlloc> allocGuard(Alloc);
TMemoryUsageInfo memInfo("ProtoBuilder");
@@ -98,14 +98,14 @@ bool TProtoBuilder::WriteData(const NYql::NDq::TDqSerializedBatch& data, const s
YQL_ENSURE(!ResultType->IsMulti());
TUnboxedValueBatch buffer(ResultType);
- dataSerializer.Deserialize(data, ResultType, buffer);
+ dataSerializer.Deserialize(std::move(data), ResultType, buffer);
return buffer.ForEachRow([&func](const auto& value) {
return func(value);
});
}
-bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) {
+bool TProtoBuilder::WriteData(TVector<NYql::NDq::TDqSerializedBatch>&& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) {
TGuard<TScopedAlloc> allocGuard(Alloc);
TMemoryUsageInfo memInfo("ProtoBuilder");
@@ -115,9 +115,9 @@ bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows
YQL_ENSURE(!ResultType->IsMulti());
- for (const auto& part : rows) {
+ for (auto& part : rows) {
TUnboxedValueBatch buffer(ResultType);
- dataSerializer.Deserialize(part, ResultType, buffer);
+ dataSerializer.Deserialize(std::move(part), ResultType, buffer);
if (!buffer.ForEachRow([&func](const auto& value) { return func(value); })) {
return false;
}
@@ -125,7 +125,7 @@ bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows
return true;
}
-Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data) {
+Ydb::ResultSet TProtoBuilder::BuildResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data) {
Ydb::ResultSet resultSet;
auto structType = AS_TYPE(TStructType, ResultType);
MKQL_ENSURE(structType, "Result is not a struct");
@@ -136,7 +136,7 @@ Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDq::TDqSeriali
ExportTypeToProto(structType->GetMemberType(memberIndex), *column.mutable_type());
}
- WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) {
+ WriteData(std::move(data), [&](const NYql::NUdf::TUnboxedValuePod& value) {
ExportValueToProto(ResultType, value, *resultSet.add_rows(), &ColumnOrder);
return true;
});
diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.h b/ydb/library/yql/providers/dq/actors/proto_builder.h
index 138b39ad798..e494608661a 100644
--- a/ydb/library/yql/providers/dq/actors/proto_builder.h
+++ b/ydb/library/yql/providers/dq/actors/proto_builder.h
@@ -19,11 +19,11 @@ public:
~TProtoBuilder();
bool CanBuildResultSet() const;
- Ydb::ResultSet BuildResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data);
- TString BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& data, ui64 maxBytesLimit = std::numeric_limits<ui64>::max());
- bool WriteYsonData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const TString& rawYson)>& func);
- bool WriteData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func);
- bool WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func);
+ Ydb::ResultSet BuildResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data);
+ TString BuildYson(TVector<NYql::NDq::TDqSerializedBatch>&& data, ui64 maxBytesLimit = std::numeric_limits<ui64>::max());
+ bool WriteYsonData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const TString& rawYson)>& func);
+ bool WriteData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func);
+ bool WriteData(TVector<NYql::NDq::TDqSerializedBatch>&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func);
TString GetSerializedType() const;
TString AllocDebugInfo();
diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h
index e852f3135a9..684631417c4 100644
--- a/ydb/library/yql/providers/dq/actors/result_actor_base.h
+++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h
@@ -83,7 +83,8 @@ namespace NYql::NDqs::NExecutionHelpers {
bool exceedRows = false;
try {
TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; });
- full = ResultBuilder->WriteYsonData(WriteQueue.back().Data, [this, &exceedRows](const TString& rawYson) {
+ NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data;
+ full = ResultBuilder->WriteYsonData(std::move(dataCopy), [this, &exceedRows](const TString& rawYson) {
if (RowsLimit && Rows + 1 > *RowsLimit) {
exceedRows = true;
return false;
diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
index 24d6d6f38bb..60515cba7e2 100644
--- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
+++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp
@@ -182,7 +182,7 @@ public:
auto serializedResultType = GetSerializedResultType(lambda);
NYql::NDqs::TProtoBuilder protoBuilder(serializedResultType, columns);
- result.Data = protoBuilder.BuildYson(rows);
+ result.Data = protoBuilder.BuildYson(std::move(rows));
AddCounter("LocalRun", TInstant::Now() - t);
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index f8e290e2584..619c27ffdee 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -373,7 +373,7 @@ public:
if (request.GetString().empty() && request.GetChunks() == 0) {
NDq::TDqSerializedBatch batch;
batch.Proto = std::move(*request.MutableData());
- dataSerializer.Deserialize(batch, source->GetInputType(), buffer);
+ dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer);
} else if (!request.GetString().empty()) {
for (auto& row : request.GetString()) {
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(row));
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index fd0e333206d..24f20fabb76 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1124,7 +1124,7 @@ public:
TDqSerializedBatch serialized;
serialized.Proto = std::move(*response.MutableData());
- dataSerializer.Deserialize(serialized, GetOutputType(), batch);
+ dataSerializer.Deserialize(std::move(serialized), GetOutputType(), batch);
return response.GetBytes();
} catch (...) {