diff options
author | aneporada <[email protected]> | 2023-07-13 12:54:02 +0300 |
---|---|---|
committer | aneporada <[email protected]> | 2023-07-13 12:54:02 +0300 |
commit | 6b539b3b1dc62f263202e7568e3532fa4c6f8db8 (patch) | |
tree | 25c2212447550d31b36c9bb2176dcff5dafc6d38 | |
parent | 3e821290327605ba1c4ad4a9cf2ce3b19acb8139 (diff) |
Pass data for deserealization by rvalue reference
This way we can:
1) reduce memory usage - serialized data chunks can be freed immediately after being read
2) support zero-copy deserialisation: resulting unboxed values can hold and own references to serialised memory regions (this is especially helpful for arrow blocks)
23 files changed, 94 insertions, 104 deletions
diff --git a/ydb/core/fq/libs/actors/result_writer.cpp b/ydb/core/fq/libs/actors/result_writer.cpp index 846cb2bdf6e..51cc08be9d6 100644 --- a/ydb/core/fq/libs/actors/result_writer.cpp +++ b/ydb/core/fq/libs/actors/result_writer.cpp @@ -111,7 +111,8 @@ private: Finished = true; NYql::NDqProto::TQueryResponse queryResult(ev->Get()->Record); - *queryResult.MutableYson() = ResultBuilder->BuildYson(Head); + *queryResult.MutableYson() = ResultBuilder->BuildYson(std::move(Head)); + Head.clear(); if (!Issues.Empty()) { IssuesToMessage(Issues, queryResult.MutableIssues()); } @@ -271,8 +272,7 @@ private: return; } - TVector<NDq::TDqSerializedBatch> batch(1); - NDq::TDqSerializedBatch& data = batch.front(); + NDq::TDqSerializedBatch data; data.Proto = std::move(*ev->Get()->Record.MutableChannelData()->MutableData()); if (data.Proto.HasPayloadId()) { data.Payload = ev->Get()->GetPayload(data.Proto.GetPayloadId()); @@ -280,7 +280,7 @@ private: FreeSpace -= data.Size(); OccupiedSpace += data.Size(); - auto resultSet = ResultBuilder->BuildResultSet(batch); + auto resultSet = ResultBuilder->BuildResultSet({ data }); if (OccupiedSpace > ResultBytesLimit) { TIssues issues; diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 5d6879e69ce..1cc61beaf86 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -37,8 +37,8 @@ struct TEvKqpExecuter { TVector<TKqpPhyTxHolder::TConstPtr>& GetTxHolders() { return TxHolders; } TVector<TKqpExecuterTxResult>& GetTxResults() { return TxResults; } void InitTxResult(const TKqpPhyTxHolder::TConstPtr& tx); - void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows); - void TakeResult(ui32 idx, const NYql::NDq::TDqSerializedBatch& rows); + void TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector&& rows); + void TakeResult(ui32 idx, NYql::NDq::TDqSerializedBatch&& rows); ui64 GetResultRowsCount() const { return ResultRowsCount; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 18265decf21..5be1366333c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -28,7 +28,7 @@ void TEvKqpExecuter::TEvTxResponse::InitTxResult(const TKqpPhyTxHolder::TConstPt } } -void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, const NDq::TDqSerializedBatch& rows) { +void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NDq::TDqSerializedBatch&& rows) { YQL_ENSURE(idx < TxResults.size()); ResultRowsCount += rows.RowCount(); ResultRowsBytes += rows.Size(); @@ -38,7 +38,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, const NDq::TDqSerialize NDq::TDqDataSerializer dataSerializer( AllocState->TypeEnv, AllocState->HolderFactory, static_cast<NDqProto::EDataTransportVersion>(rows.Proto.GetTransportVersion())); - dataSerializer.Deserialize(rows, result.MkqlItemType, result.Rows); + dataSerializer.Deserialize(std::move(rows), result.MkqlItemType, result.Rows); } } @@ -50,7 +50,7 @@ TEvKqpExecuter::TEvTxResponse::~TEvTxResponse() { } } -void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector& rows) { +void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnboxedValueVector&& rows) { YQL_ENSURE(idx < TxResults.size()); ResultRowsCount += rows.size(); auto& txResult = TxResults[idx]; @@ -63,7 +63,7 @@ void TEvKqpExecuter::TEvTxResponse::TakeResult(ui32 idx, NKikimr::NMiniKQL::TUnb emptyVector.swap(rows); } - serializer.Deserialize(buffer, txResult.MkqlItemType, txResult.Rows); + serializer.Deserialize(std::move(buffer), txResult.MkqlItemType, txResult.Rows); } TActorId ReportToRl(ui64 ru, const TString& database, const TString& userToken, diff --git a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp index 5180ce9ea84..c24bf3ec7a9 100644 --- a/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_literal_executer.cpp @@ -244,7 +244,8 @@ public: auto& channelDesc = TasksGraph.GetChannel(outputChannelId); NYql::NDq::TDqSerializedBatch outputData; while (outputChannel->Pop(outputData)) { - ResponseEv->TakeResult(channelDesc.DstInputIndex, outputData); + ResponseEv->TakeResult(channelDesc.DstInputIndex, std::move(outputData)); + outputData = {}; } YQL_ENSURE(outputChannel->IsFinished()); } diff --git a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp index 0ffcf1d7c66..2da9881c51a 100644 --- a/ydb/core/kqp/executer_actor/kqp_result_channel.cpp +++ b/ydb/core/kqp/executer_actor/kqp_result_channel.cpp @@ -157,7 +157,7 @@ private: batch.Payload = std::move(computeData.Payload); TKqpProtoBuilder protoBuilder{*AppData()->FunctionRegistry}; - auto resultSet = protoBuilder.BuildYdbResultSet(batches, ItemType, ColumnOrder); + auto resultSet = protoBuilder.BuildYdbResultSet(std::move(batches), ItemType, ColumnOrder); auto streamEv = MakeHolder<TEvKqpExecuter::TEvStreamData>(); streamEv->Record.SetSeqNo(computeData.Proto.GetSeqNo()); @@ -193,7 +193,7 @@ private: auto channelId = computeData.Proto.GetChannelData().GetChannelId(); - ResultReceiver->TakeResult(InputIndex, batch); + ResultReceiver->TakeResult(InputIndex, std::move(batch)); auto ackEv = MakeHolder<NYql::NDq::TEvDqCompute::TEvChannelDataAck>(); diff --git a/ydb/core/kqp/runtime/kqp_transport.cpp b/ydb/core/kqp/runtime/kqp_transport.cpp index 8669278912c..fa247f4ff54 100644 --- a/ydb/core/kqp/runtime/kqp_transport.cpp +++ b/ydb/core/kqp/runtime/kqp_transport.cpp @@ -47,7 +47,7 @@ TKqpProtoBuilder::~TKqpProtoBuilder() { } Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet( - const TVector<NYql::NDq::TDqSerializedBatch>& data, + TVector<NYql::NDq::TDqSerializedBatch>&& data, NKikimr::NMiniKQL::TType* mkqlSrcRowType, const TVector<ui32>* columnOrder) { @@ -76,7 +76,7 @@ Ydb::ResultSet TKqpProtoBuilder::BuildYdbResultSet( for (auto& part : data) { if (part.RowCount()) { TUnboxedValueBatch rows(mkqlSrcRowType); - dataSerializer.Deserialize(part, mkqlSrcRowType, rows); + dataSerializer.Deserialize(std::move(part), mkqlSrcRowType, rows); rows.ForEachRow([&](const NUdf::TUnboxedValue& value) { ExportValueToProto(mkqlSrcRowType, value, *resultSet.add_rows(), columnOrder); }); diff --git a/ydb/core/kqp/runtime/kqp_transport.h b/ydb/core/kqp/runtime/kqp_transport.h index 7c9e76d3147..77feca0b0bd 100644 --- a/ydb/core/kqp/runtime/kqp_transport.h +++ b/ydb/core/kqp/runtime/kqp_transport.h @@ -20,7 +20,7 @@ public: ~TKqpProtoBuilder(); - Ydb::ResultSet BuildYdbResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data, + Ydb::ResultSet BuildYdbResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data, NKikimr::NMiniKQL::TType* srcRowType, const TVector<ui32>* columnOrder = nullptr); private: diff --git a/ydb/library/yql/dq/common/dq_serialized_batch.h b/ydb/library/yql/dq/common/dq_serialized_batch.h index 8423dd6c595..17379060eec 100644 --- a/ydb/library/yql/dq/common/dq_serialized_batch.h +++ b/ydb/library/yql/dq/common/dq_serialized_batch.h @@ -39,6 +39,17 @@ struct TDqSerializedBatch { } void SetPayload(const NKikimr::NMiniKQL::TPagedBuffer::TPtr& buffer); + + TRope PullPayload() { + TRope result; + if (IsOOB()) { + result = std::move(Payload); + } else { + result = TRope(std::move(*Proto.MutableRaw())); + } + Clear(); + return result; + } }; TRope SaveForSpilling(TDqSerializedBatch&& batch); diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index f649d9e93a1..9f6bc5965bb 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -12,17 +12,18 @@ private: void PushImpl(TDqSerializedBatch&& data) { const i64 space = data.Size(); + const size_t rowCount = data.RowCount(); NKikimr::NMiniKQL::TUnboxedValueBatch batch(InputType); if (Y_UNLIKELY(ProfileStats)) { auto startTime = TInstant::Now(); - DataSerializer.Deserialize(data, InputType, batch); + DataSerializer.Deserialize(std::move(data), InputType, batch); ProfileStats->DeserializationTime += (TInstant::Now() - startTime); } else { - DataSerializer.Deserialize(data, InputType, batch); + DataSerializer.Deserialize(std::move(data), InputType, batch); } - YQL_ENSURE(batch.RowCount() == data.RowCount()); + YQL_ENSURE(batch.RowCount() == rowCount); AddBatch(std::move(batch), space); } diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 07744232427..f925b261155 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -278,12 +278,7 @@ public: if (!this->Pop(chunk)) { break; } - if (IsOOBTransport(TransportVersion)) { - Packer.UnpackBatch(std::move(chunk.Payload), HolderFactory, rows); - } else { - TStringBuf buf(chunk.Proto.GetRaw()); - Packer.UnpackBatch(buf, HolderFactory, rows); - } + Packer.UnpackBatch(chunk.PullPayload(), HolderFactory, rows); } if (OutputType->IsMulti()) { diff --git a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp index 3cd24aeaee8..f58e9103b5b 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel_ut.cpp @@ -200,7 +200,7 @@ void TestSingleRead(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(10, ch->GetStats()->RowsOut); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); ValidateBatch(ctx, buffer, 0, 10); data.Clear(); @@ -234,19 +234,20 @@ void TestPartialRead(TTestContext& ctx) { while (readRows < 9) { TDqSerializedBatch data; UNIT_ASSERT(ch->Pop(data)); + const auto rowCount = data.RowCount(); ui32 v = expected[req]; ++req; - UNIT_ASSERT_VALUES_EQUAL(v, data.RowCount()); + UNIT_ASSERT_VALUES_EQUAL(v, rowCount); UNIT_ASSERT_VALUES_EQUAL(++readChunks, ch->GetStats()->Chunks); UNIT_ASSERT_VALUES_EQUAL(9, ch->GetStats()->RowsIn); - UNIT_ASSERT_VALUES_EQUAL(readRows + data.RowCount(), ch->GetStats()->RowsOut); + UNIT_ASSERT_VALUES_EQUAL(readRows + rowCount, ch->GetStats()->RowsOut); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); - ValidateBatch(ctx, buffer, readRows, data.RowCount()); - readRows += data.RowCount(); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, readRows, rowCount); + readRows += rowCount; } TDqSerializedBatch data; @@ -306,7 +307,7 @@ void TestPopAll(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(50, data.RowCount()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); ValidateBatch(ctx, buffer, 0, 50); data.Clear(); UNIT_ASSERT(!ch->Pop(data)); @@ -348,7 +349,7 @@ void TestBigRow(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(2, ch->GetStats()->RowsOut); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); UNIT_ASSERT_VALUES_EQUAL(2, buffer.RowCount()); ui32 i = 1; @@ -380,7 +381,7 @@ void TestBigRow(TTestContext& ctx) { UNIT_ASSERT_VALUES_EQUAL(i, ch->GetStats()->RowsOut); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); UNIT_ASSERT_VALUES_EQUAL(1, buffer.RowCount()); @@ -428,10 +429,11 @@ void TestSpillWithMockStorage(TTestContext& ctx) { TDqSerializedBatch data; while (ch->Pop(data)) { + const auto rowCount = data.RowCount(); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); - ValidateBatch(ctx, buffer, loadedRows, data.RowCount()); - loadedRows += data.RowCount(); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, loadedRows, rowCount); + loadedRows += rowCount; } UNIT_ASSERT_VALUES_EQUAL(35, loadedRows); UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount()); @@ -450,10 +452,11 @@ void TestSpillWithMockStorage(TTestContext& ctx) { TDqSerializedBatch data; while (ch->Pop(data)) { + const auto rowCount = data.RowCount(); TUnboxedValueBatch buffer(ctx.GetOutputType()); - ctx.Ds.Deserialize(data, ctx.GetOutputType(), buffer); - ValidateBatch(ctx, buffer, loadedRows + 100, data.RowCount()); - loadedRows += data.RowCount(); + ctx.Ds.Deserialize(std::move(data), ctx.GetOutputType(), buffer); + ValidateBatch(ctx, buffer, loadedRows + 100, rowCount); + loadedRows += rowCount; } UNIT_ASSERT_VALUES_EQUAL(5, loadedRows); UNIT_ASSERT_VALUES_EQUAL(0, ch->GetValuesCount()); diff --git a/ydb/library/yql/dq/runtime/dq_transport.cpp b/ydb/library/yql/dq/runtime/dq_transport.cpp index c44a26ba5e9..f2fcf48dc65 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.cpp +++ b/ydb/library/yql/dq/runtime/dq_transport.cpp @@ -91,16 +91,16 @@ TDqSerializedBatch SerializeBuffer(NDqProto::EDataTransportVersion version, cons return result; } -template<bool Fast, typename TInputBuf> -void DeserializeValue(const TType* type, TInputBuf&& data, const THolderFactory& holderFactory, NUdf::TUnboxedValue& value) +template<bool Fast> +void DeserializeValue(const TType* type, TRope&& data, const THolderFactory& holderFactory, NUdf::TUnboxedValue& value) { using TPacker = TValuePackerTransport<Fast>; TPacker packer(/* stable */ false, type); value = packer.Unpack(std::move(data), holderFactory); } -template<bool Fast, typename TInputBuf> -void DeserializeBuffer(const TType* itemType, TInputBuf&& data, const THolderFactory& holderFactory, TUnboxedValueBatch& buffer) +template<bool Fast> +void DeserializeBuffer(const TType* itemType, TRope&& data, const THolderFactory& holderFactory, TUnboxedValueBatch& buffer) { using TPacker = TValuePackerTransport<Fast>; TPacker packer(/* stable */ false, itemType); @@ -125,37 +125,33 @@ TDqSerializedBatch TDqDataSerializer::Serialize(const NKikimr::NMiniKQL::TUnboxe return SerializeBuffer(TransportVersion, itemType, buffer); } -void TDqDataSerializer::Deserialize(const TDqSerializedBatch& data, const TType* itemType, TUnboxedValueBatch& buffer) const +void TDqDataSerializer::Deserialize(TDqSerializedBatch&& data, const TType* itemType, TUnboxedValueBatch& buffer) const { auto guard = TypeEnv.BindAllocator(); switch (data.Proto.GetTransportVersion()) { case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0: - return DeserializeBuffer<false, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, buffer); case NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0: - return DeserializeBuffer<false, TRope>(itemType, TRope(data.Payload), HolderFactory, buffer); + return DeserializeBuffer<false>(itemType, data.PullPayload(), HolderFactory, buffer); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: - return DeserializeBuffer<true, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, buffer); case NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0: - return DeserializeBuffer<true, TRope>(itemType, TRope(data.Payload), HolderFactory, buffer); + return DeserializeBuffer<true>(itemType, data.PullPayload(), HolderFactory, buffer); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } } -void TDqDataSerializer::Deserialize(const TDqSerializedBatch& data, const TType* itemType, NUdf::TUnboxedValue& value) const +void TDqDataSerializer::Deserialize(TDqSerializedBatch&& data, const TType* itemType, NUdf::TUnboxedValue& value) const { auto guard = TypeEnv.BindAllocator(); switch (data.Proto.GetTransportVersion()) { case NDqProto::DATA_TRANSPORT_VERSION_UNSPECIFIED: case NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0: - return DeserializeValue<false, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, value); case NDqProto::DATA_TRANSPORT_OOB_PICKLE_1_0: - return DeserializeValue<false, TRope>(itemType, TRope(data.Payload), HolderFactory, value); + return DeserializeValue<false>(itemType, data.PullPayload(), HolderFactory, value); case NDqProto::DATA_TRANSPORT_UV_FAST_PICKLE_1_0: - return DeserializeValue<true, TStringBuf>(itemType, data.Proto.GetRaw(), HolderFactory, value); case NDqProto::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0: - return DeserializeValue<true, TRope>(itemType, TRope(data.Payload), HolderFactory, value); + return DeserializeValue<true>(itemType, data.PullPayload(), HolderFactory, value); default: YQL_ENSURE(false, "Unsupported TransportVersion"); } diff --git a/ydb/library/yql/dq/runtime/dq_transport.h b/ydb/library/yql/dq/runtime/dq_transport.h index f6c0eff8eb3..656ca49fb1e 100644 --- a/ydb/library/yql/dq/runtime/dq_transport.h +++ b/ydb/library/yql/dq/runtime/dq_transport.h @@ -46,9 +46,9 @@ public: YQL_ENSURE(false, "Unsupported TransportVersion"); } - void Deserialize(const TDqSerializedBatch& data, const NKikimr::NMiniKQL::TType* itemType, + void Deserialize(TDqSerializedBatch&& data, const NKikimr::NMiniKQL::TType* itemType, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer) const; - void Deserialize(const TDqSerializedBatch& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const; + void Deserialize(TDqSerializedBatch&& data, const NKikimr::NMiniKQL::TType* itemType, NUdf::TUnboxedValue& value) const; struct TEstimateSizeSettings { bool WithHeaders; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp index 250982b3b20..6bcb017b0a9 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.cpp @@ -954,12 +954,6 @@ TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type) } template<bool Fast> -NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TStringBuf buf, const THolderFactory& holderFactory) const { - TChunkedInputBuffer chunked(buf); - return DoUnpack<Fast>(Type_, chunked, buf.size(), holderFactory, State_); -} - -template<bool Fast> NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THolderFactory& holderFactory) const { const size_t totalSize = buf.GetSize(); TChunkedInputBuffer chunked(std::move(buf)); @@ -967,12 +961,6 @@ NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TRope&& buf, const THold } template<bool Fast> -void TValuePackerTransport<Fast>::UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { - TChunkedInputBuffer chunked(buf); - DoUnpackBatch<Fast>(Type_, chunked, buf.size(), holderFactory, IncrementalState_, result); -} - -template<bool Fast> void TValuePackerTransport<Fast>::UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const { const size_t totalSize = buf.GetSize(); TChunkedInputBuffer chunked(std::move(buf)); diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h index 8fcaaecab97..03255a09ba5 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack.h @@ -88,9 +88,7 @@ public: // Pack()/Unpack() will pack/unpack single value of type T TPagedBuffer::TPtr Pack(const NUdf::TUnboxedValuePod& value) const; - NUdf::TUnboxedValue Unpack(TStringBuf buf, const THolderFactory& holderFactory) const; NUdf::TUnboxedValue Unpack(TRope&& buf, const THolderFactory& holderFactory) const; - void UnpackBatch(TStringBuf buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; void UnpackBatch(TRope&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const; private: void BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const; diff --git a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h index 7881af883aa..60d9597252f 100644 --- a/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h +++ b/ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h @@ -62,14 +62,12 @@ class TChunkedInputBuffer : private TNonCopyable { public: explicit TChunkedInputBuffer(TRope&& rope) : Rope_(std::move(rope)) - , It_(Rope_.begin()) { Next(); } explicit TChunkedInputBuffer(TStringBuf input) : Rope_(TRope{}) - , It_(Rope_.begin()) , Data_(input.data()) , Len_(input.size()) { @@ -112,15 +110,14 @@ public: void Next() { Y_VERIFY_DEBUG(Len_ == 0); - Len_ = 0; - Data_ = nullptr; - while (It_ != Rope_.end()) { - Len_ = It_.ContiguousSize(); - Data_ = It_.ContiguousData(); - ++It_; - if (Len_) { - break; - } + Rope_.EraseFront(OriginalLen_); + if (!Rope_.IsEmpty()) { + Len_ = OriginalLen_ = Rope_.begin().ContiguousSize(); + Data_ = Rope_.begin().ContiguousData(); + Y_VERIFY_DEBUG(Len_ > 0); + } else { + Len_ = OriginalLen_ = 0; + Data_ = nullptr; } } @@ -140,9 +137,9 @@ private: } TRope Rope_; - TRope::TConstIterator It_; const char* Data_ = nullptr; size_t Len_ = 0; + size_t OriginalLen_ = 0; }; template <typename T> diff --git a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp index 70280927879..e369d37671a 100644 --- a/ydb/library/yql/providers/dq/actors/full_result_writer.cpp +++ b/ydb/library/yql/providers/dq/actors/full_result_writer.cpp @@ -137,8 +137,7 @@ private: try { TFailureInjector::Reach("full_result_fail_on_write", [] { throw yexception() << "full_result_fail_on_write"; }); - NDq::TDqSerializedBatch data = request.PullSerializedBatch(); - ResultBuilder->WriteData(data, [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) { + ResultBuilder->WriteData(request.PullSerializedBatch(), [writer = FullResultWriter.Get()](const NUdf::TUnboxedValuePod& value) { writer->AddRow(value); return true; }); diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.cpp b/ydb/library/yql/providers/dq/actors/proto_builder.cpp index 55fd654dbb0..9a236f6edd3 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.cpp +++ b/ydb/library/yql/providers/dq/actors/proto_builder.cpp @@ -59,13 +59,13 @@ bool TProtoBuilder::CanBuildResultSet() const { return ResultType->GetKind() == TType::EKind::Struct; } -TString TProtoBuilder::BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& rows, ui64 maxBytesLimit) { +TString TProtoBuilder::BuildYson(TVector<NYql::NDq::TDqSerializedBatch>&& rows, ui64 maxBytesLimit) { ui64 size = 0; TStringStream out; NYson::TYsonWriter writer((IOutputStream*)&out); writer.OnBeginList(); - auto full = WriteData(rows, [&](const NYql::NUdf::TUnboxedValuePod& value) { + auto full = WriteData(std::move(rows), [&](const NYql::NUdf::TUnboxedValuePod& value) { auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder); writer.OnListItem(); writer.OnRaw(rowYson); @@ -81,14 +81,14 @@ TString TProtoBuilder::BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& r return out.Str(); } -bool TProtoBuilder::WriteYsonData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const TString& rawYson)>& func) { - return WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) { +bool TProtoBuilder::WriteYsonData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const TString& rawYson)>& func) { + return WriteData(std::move(data), [&](const NYql::NUdf::TUnboxedValuePod& value) { auto rowYson = NCommon::WriteYsonValue(value, ResultType, ColumnOrder.empty() ? nullptr : &ColumnOrder); return func(rowYson); }); } -bool TProtoBuilder::WriteData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) { +bool TProtoBuilder::WriteData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) { TGuard<TScopedAlloc> allocGuard(Alloc); TMemoryUsageInfo memInfo("ProtoBuilder"); @@ -98,14 +98,14 @@ bool TProtoBuilder::WriteData(const NYql::NDq::TDqSerializedBatch& data, const s YQL_ENSURE(!ResultType->IsMulti()); TUnboxedValueBatch buffer(ResultType); - dataSerializer.Deserialize(data, ResultType, buffer); + dataSerializer.Deserialize(std::move(data), ResultType, buffer); return buffer.ForEachRow([&func](const auto& value) { return func(value); }); } -bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) { +bool TProtoBuilder::WriteData(TVector<NYql::NDq::TDqSerializedBatch>&& rows, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func) { TGuard<TScopedAlloc> allocGuard(Alloc); TMemoryUsageInfo memInfo("ProtoBuilder"); @@ -115,9 +115,9 @@ bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows YQL_ENSURE(!ResultType->IsMulti()); - for (const auto& part : rows) { + for (auto& part : rows) { TUnboxedValueBatch buffer(ResultType); - dataSerializer.Deserialize(part, ResultType, buffer); + dataSerializer.Deserialize(std::move(part), ResultType, buffer); if (!buffer.ForEachRow([&func](const auto& value) { return func(value); })) { return false; } @@ -125,7 +125,7 @@ bool TProtoBuilder::WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& rows return true; } -Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data) { +Ydb::ResultSet TProtoBuilder::BuildResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data) { Ydb::ResultSet resultSet; auto structType = AS_TYPE(TStructType, ResultType); MKQL_ENSURE(structType, "Result is not a struct"); @@ -136,7 +136,7 @@ Ydb::ResultSet TProtoBuilder::BuildResultSet(const TVector<NYql::NDq::TDqSeriali ExportTypeToProto(structType->GetMemberType(memberIndex), *column.mutable_type()); } - WriteData(data, [&](const NYql::NUdf::TUnboxedValuePod& value) { + WriteData(std::move(data), [&](const NYql::NUdf::TUnboxedValuePod& value) { ExportValueToProto(ResultType, value, *resultSet.add_rows(), &ColumnOrder); return true; }); diff --git a/ydb/library/yql/providers/dq/actors/proto_builder.h b/ydb/library/yql/providers/dq/actors/proto_builder.h index 138b39ad798..e494608661a 100644 --- a/ydb/library/yql/providers/dq/actors/proto_builder.h +++ b/ydb/library/yql/providers/dq/actors/proto_builder.h @@ -19,11 +19,11 @@ public: ~TProtoBuilder(); bool CanBuildResultSet() const; - Ydb::ResultSet BuildResultSet(const TVector<NYql::NDq::TDqSerializedBatch>& data); - TString BuildYson(const TVector<NYql::NDq::TDqSerializedBatch>& data, ui64 maxBytesLimit = std::numeric_limits<ui64>::max()); - bool WriteYsonData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const TString& rawYson)>& func); - bool WriteData(const NYql::NDq::TDqSerializedBatch& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func); - bool WriteData(const TVector<NYql::NDq::TDqSerializedBatch>& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func); + Ydb::ResultSet BuildResultSet(TVector<NYql::NDq::TDqSerializedBatch>&& data); + TString BuildYson(TVector<NYql::NDq::TDqSerializedBatch>&& data, ui64 maxBytesLimit = std::numeric_limits<ui64>::max()); + bool WriteYsonData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const TString& rawYson)>& func); + bool WriteData(NYql::NDq::TDqSerializedBatch&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func); + bool WriteData(TVector<NYql::NDq::TDqSerializedBatch>&& data, const std::function<bool(const NYql::NUdf::TUnboxedValuePod& value)>& func); TString GetSerializedType() const; TString AllocDebugInfo(); diff --git a/ydb/library/yql/providers/dq/actors/result_actor_base.h b/ydb/library/yql/providers/dq/actors/result_actor_base.h index e852f3135a9..684631417c4 100644 --- a/ydb/library/yql/providers/dq/actors/result_actor_base.h +++ b/ydb/library/yql/providers/dq/actors/result_actor_base.h @@ -83,7 +83,8 @@ namespace NYql::NDqs::NExecutionHelpers { bool exceedRows = false; try { TFailureInjector::Reach("result_actor_base_fail_on_response_write", [] { throw yexception() << "result_actor_base_fail_on_response_write"; }); - full = ResultBuilder->WriteYsonData(WriteQueue.back().Data, [this, &exceedRows](const TString& rawYson) { + NDq::TDqSerializedBatch dataCopy = WriteQueue.back().Data; + full = ResultBuilder->WriteYsonData(std::move(dataCopy), [this, &exceedRows](const TString& rawYson) { if (RowsLimit && Rows + 1 > *RowsLimit) { exceedRows = true; return false; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 24d6d6f38bb..60515cba7e2 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -182,7 +182,7 @@ public: auto serializedResultType = GetSerializedResultType(lambda); NYql::NDqs::TProtoBuilder protoBuilder(serializedResultType, columns); - result.Data = protoBuilder.BuildYson(rows); + result.Data = protoBuilder.BuildYson(std::move(rows)); AddCounter("LocalRun", TInstant::Now() - t); diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index f8e290e2584..619c27ffdee 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -373,7 +373,7 @@ public: if (request.GetString().empty() && request.GetChunks() == 0) { NDq::TDqSerializedBatch batch; batch.Proto = std::move(*request.MutableData()); - dataSerializer.Deserialize(batch, source->GetInputType(), buffer); + dataSerializer.Deserialize(std::move(batch), source->GetInputType(), buffer); } else if (!request.GetString().empty()) { for (auto& row : request.GetString()) { buffer.emplace_back(NKikimr::NMiniKQL::MakeString(row)); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index fd0e333206d..24f20fabb76 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1124,7 +1124,7 @@ public: TDqSerializedBatch serialized; serialized.Proto = std::move(*response.MutableData()); - dataSerializer.Deserialize(serialized, GetOutputType(), batch); + dataSerializer.Deserialize(std::move(serialized), GetOutputType(), batch); return response.GetBytes(); } catch (...) { |