diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-07 18:51:14 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-07 19:25:20 +0300 |
commit | aa4bbf0349f5ee93d10e133c1d79cb5151f853f0 (patch) | |
tree | ec42dfccaa82ef690ed3ed630f9fc35bccb91686 | |
parent | 6fb44d264c2506caaab59ed462d729da9fbe7653 (diff) | |
download | ydb-aa4bbf0349f5ee93d10e133c1d79cb5151f853f0.tar.gz |
Better sink data serialization for pipe
6 files changed, 17 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 4156628806..14e37747db 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -389,7 +389,7 @@ struct TEvSinkPopFinished { } const ui64 Index; - TVector<TString> Strings; + NDq::TDqSerializedBatch Batch; TMaybe<NDqProto::TCheckpoint> Checkpoint; i64 Size; i64 CheckpointSize; diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto index 85e01537f4..f1bac98b9b 100644 --- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto +++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto @@ -75,14 +75,12 @@ message TMeteringStatsResponse { message TSinkPopRequest { uint64 Bytes = 1; - bool Raw = 2; }; message TSinkPopResponse { uint64 Bytes = 1; TData Data = 2; repeated TMetric Metric = 3; - repeated bytes String = 4; } message TIsFinishedResponse { diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index d3b48bef8b..6a9f2eb8ab 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -608,19 +608,13 @@ public: auto bytes = sink->Pop(batch, request.GetBytes()); NDqProto::TSinkPopResponse response; - if (request.GetRaw()) { - batch.ForEachRow([&response](const auto& value) { - *response.AddString() = value.AsStringRef(); - }); - } else { - NDq::TDqDataSerializer dataSerializer( - Runner->GetTypeEnv(), - Runner->GetHolderFactory(), - NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); - NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType); - YQL_ENSURE(!serialized.IsOOB()); - *response.MutableData() = std::move(serialized.Proto); - } + NDq::TDqDataSerializer dataSerializer( + Runner->GetTypeEnv(), + Runner->GetHolderFactory(), + NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); + NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType); + YQL_ENSURE(!serialized.IsOOB()); + *response.MutableData() = std::move(serialized.Proto); response.SetBytes(bytes); response.Save(&output); break; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 8932fe43c9..8f03119808 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1051,7 +1051,7 @@ public: return PopStats; } - ui64 PopString(TVector<TString>& batch, ui64 bytes) override { + ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) override { try { NDqProto::TCommandHeader header; header.SetVersion(5); @@ -1062,16 +1062,12 @@ public: NDqProto::TSinkPopRequest request; request.SetBytes(bytes); - request.SetRaw(true); header.Save(&Output); NDqProto::TSinkPopResponse response; response.Load(&Input); - for (auto& row : response.GetString()) { - batch.emplace_back(std::move(row)); - } - + batch.Proto = std::move(*response.MutableData()); return response.GetBytes(); } catch (...) { TaskRunner->RaiseException(); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index ccda9e629b..5739e96dec 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -21,7 +21,7 @@ public: class IStringSink: public NDq::IDqAsyncOutputBuffer { public: virtual ~IStringSink() = default; - virtual ui64 PopString(TVector<TString>& batch, ui64 bytes) = 0; + virtual ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) = 0; }; class IInputChannel : public TThrRefBase, private TNonCopyable { diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 421f9ecb2e..d1fd247dda 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -366,9 +366,10 @@ private: void OnSinkPopFinished(TEvSinkPopFinished::TPtr& ev) { auto guard = TaskRunner->BindAllocator(); NKikimr::NMiniKQL::TUnboxedValueBatch batch; - for (auto& row: ev->Get()->Strings) { - batch.emplace_back(NKikimr::NMiniKQL::MakeString(row)); - } + auto sink = TaskRunner->GetSink(ev->Get()->Index); + TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); + dataSerializer.Deserialize(std::move(ev->Get()->Batch), sink->GetOutputType(), batch); + Parent->SinkSend( ev->Get()->Index, std::move(batch), @@ -390,7 +391,7 @@ private: try { // auto guard = taskRunner->BindAllocator(); // only for local mode auto sink = taskRunner->GetSink(ev->Get()->Index); - TVector<TString> batch; + NDq::TDqSerializedBatch batch; NDqProto::TCheckpoint checkpoint; TMaybe<NDqProto::TCheckpoint> maybeCheckpoint; i64 size = 0; @@ -408,7 +409,7 @@ private: auto event = MakeHolder<TEvSinkPopFinished>( ev->Get()->Index, std::move(maybeCheckpoint), size, checkpointSize, finished, changed); - event->Strings = std::move(batch); + event->Batch = std::move(batch); // repack data and forward actorSystem->Send( new IEventHandle( |