aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-07 18:51:14 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-07 19:25:20 +0300
commitaa4bbf0349f5ee93d10e133c1d79cb5151f853f0 (patch)
treeec42dfccaa82ef690ed3ed630f9fc35bccb91686
parent6fb44d264c2506caaab59ed462d729da9fbe7653 (diff)
downloadydb-aa4bbf0349f5ee93d10e133c1d79cb5151f853f0.tar.gz
Better sink data serialization for pipe
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h2
-rw-r--r--ydb/library/yql/providers/dq/api/protos/task_command_executor.proto2
-rw-r--r--ydb/library/yql/providers/dq/runtime/task_command_executor.cpp20
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp8
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h2
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp11
6 files changed, 17 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 4156628806..14e37747db 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -389,7 +389,7 @@ struct TEvSinkPopFinished
{ }
const ui64 Index;
- TVector<TString> Strings;
+ NDq::TDqSerializedBatch Batch;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
i64 Size;
i64 CheckpointSize;
diff --git a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
index 85e01537f4..f1bac98b9b 100644
--- a/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
+++ b/ydb/library/yql/providers/dq/api/protos/task_command_executor.proto
@@ -75,14 +75,12 @@ message TMeteringStatsResponse {
message TSinkPopRequest {
uint64 Bytes = 1;
- bool Raw = 2;
};
message TSinkPopResponse {
uint64 Bytes = 1;
TData Data = 2;
repeated TMetric Metric = 3;
- repeated bytes String = 4;
}
message TIsFinishedResponse {
diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
index d3b48bef8b..6a9f2eb8ab 100644
--- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
+++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp
@@ -608,19 +608,13 @@ public:
auto bytes = sink->Pop(batch, request.GetBytes());
NDqProto::TSinkPopResponse response;
- if (request.GetRaw()) {
- batch.ForEachRow([&response](const auto& value) {
- *response.AddString() = value.AsStringRef();
- });
- } else {
- NDq::TDqDataSerializer dataSerializer(
- Runner->GetTypeEnv(),
- Runner->GetHolderFactory(),
- NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
- NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType);
- YQL_ENSURE(!serialized.IsOOB());
- *response.MutableData() = std::move(serialized.Proto);
- }
+ NDq::TDqDataSerializer dataSerializer(
+ Runner->GetTypeEnv(),
+ Runner->GetHolderFactory(),
+ NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+ NDq::TDqSerializedBatch serialized = dataSerializer.Serialize(batch, outputType);
+ YQL_ENSURE(!serialized.IsOOB());
+ *response.MutableData() = std::move(serialized.Proto);
response.SetBytes(bytes);
response.Save(&output);
break;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 8932fe43c9..8f03119808 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1051,7 +1051,7 @@ public:
return PopStats;
}
- ui64 PopString(TVector<TString>& batch, ui64 bytes) override {
+ ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) override {
try {
NDqProto::TCommandHeader header;
header.SetVersion(5);
@@ -1062,16 +1062,12 @@ public:
NDqProto::TSinkPopRequest request;
request.SetBytes(bytes);
- request.SetRaw(true);
header.Save(&Output);
NDqProto::TSinkPopResponse response;
response.Load(&Input);
- for (auto& row : response.GetString()) {
- batch.emplace_back(std::move(row));
- }
-
+ batch.Proto = std::move(*response.MutableData());
return response.GetBytes();
} catch (...) {
TaskRunner->RaiseException();
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index ccda9e629b..5739e96dec 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -21,7 +21,7 @@ public:
class IStringSink: public NDq::IDqAsyncOutputBuffer {
public:
virtual ~IStringSink() = default;
- virtual ui64 PopString(TVector<TString>& batch, ui64 bytes) = 0;
+ virtual ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) = 0;
};
class IInputChannel : public TThrRefBase, private TNonCopyable {
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 421f9ecb2e..d1fd247dda 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -366,9 +366,10 @@ private:
void OnSinkPopFinished(TEvSinkPopFinished::TPtr& ev) {
auto guard = TaskRunner->BindAllocator();
NKikimr::NMiniKQL::TUnboxedValueBatch batch;
- for (auto& row: ev->Get()->Strings) {
- batch.emplace_back(NKikimr::NMiniKQL::MakeString(row));
- }
+ auto sink = TaskRunner->GetSink(ev->Get()->Index);
+ TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
+ dataSerializer.Deserialize(std::move(ev->Get()->Batch), sink->GetOutputType(), batch);
+
Parent->SinkSend(
ev->Get()->Index,
std::move(batch),
@@ -390,7 +391,7 @@ private:
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
auto sink = taskRunner->GetSink(ev->Get()->Index);
- TVector<TString> batch;
+ NDq::TDqSerializedBatch batch;
NDqProto::TCheckpoint checkpoint;
TMaybe<NDqProto::TCheckpoint> maybeCheckpoint;
i64 size = 0;
@@ -408,7 +409,7 @@ private:
auto event = MakeHolder<TEvSinkPopFinished>(
ev->Get()->Index,
std::move(maybeCheckpoint), size, checkpointSize, finished, changed);
- event->Strings = std::move(batch);
+ event->Batch = std::move(batch);
// repack data and forward
actorSystem->Send(
new IEventHandle(