aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@ydb.tech>2023-11-10 15:09:41 +0300
committeraozeritsky <aozeritsky@ydb.tech>2023-11-10 17:20:17 +0300
commitc540d4e186b249a7d218d745b29d42731913f2e7 (patch)
tree7c18c74da6f0d86059c5604e10116726ee807dda
parentdbbda4251f0a9a6a01b3942d9094fc2ac49d7b71 (diff)
downloadydb-c540d4e186b249a7d218d745b29d42731913f2e7.tar.gz
Add Push/Pop with SerializedBatch to interface
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.cpp4
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.h4
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.cpp5
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.h3
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp10
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h12
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp4
7 files changed, 22 insertions, 20 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp
index 5acfc3e3d1..7d515e5cb3 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp
@@ -37,6 +37,10 @@ public:
AddBatch(std::move(batch), space);
}
}
+
+ virtual void Push(TDqSerializedBatch&&, i64) override {
+ YQL_ENSURE(!"Unimplemented");
+ }
};
IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer(
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h
index 3ef75dec31..939f278709 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.h
+++ b/ydb/library/yql/dq/runtime/dq_async_input.h
@@ -1,5 +1,6 @@
#pragma once
#include "dq_input.h"
+#include "dq_transport.h"
namespace NYql::NDq {
@@ -19,7 +20,8 @@ public:
virtual const TDqAsyncInputBufferStats& GetPushStats() const = 0;
virtual void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) = 0;
-
+ virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0;
+
virtual void Finish() = 0;
};
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp
index e2bbf1f089..5a617d71ca 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp
@@ -135,6 +135,11 @@ public:
return usedBytes;
}
+ virtual ui64 Pop(TDqSerializedBatch&, ui64) override {
+ YQL_ENSURE(!"Unimplemented");
+ return 0;
+ }
+
bool Pop(NDqProto::TWatermark& watermark) override {
if (!Values.empty() && std::holds_alternative<NDqProto::TWatermark>(Values.front().Value)) {
watermark = std::move(std::get<NDqProto::TWatermark>(Values.front().Value));
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h
index 0375b6c9eb..6f044124d0 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.h
+++ b/ydb/library/yql/dq/runtime/dq_async_output.h
@@ -1,5 +1,6 @@
#pragma once
#include "dq_output.h"
+#include "dq_transport.h"
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
@@ -24,6 +25,8 @@ public:
// Pop data to send. Return estimated size of returned data.
[[nodiscard]]
virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) = 0;
+ [[nodiscard]]
+ virtual ui64 Pop(TDqSerializedBatch&, ui64 bytes) = 0;
// Pop watermark
[[nodiscard]]
virtual bool Pop(NDqProto::TWatermark& watermark) = 0;
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 8f03119808..73e25ac49c 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -670,7 +670,7 @@ private:
TDqInputStats PopStats;
};
-class TDqSource: public IStringSource {
+class TDqSource: public IDqAsyncInputBuffer {
public:
TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, IPipeTaskRunner* taskRunner)
: TaskId(taskId)
@@ -721,7 +721,7 @@ public:
ythrow yexception() << "unimplemented";
}
- void PushString(NDq::TDqSerializedBatch&& serialized, i64 space) override {
+ void Push(NDq::TDqSerializedBatch&& serialized, i64 space) override {
YQL_ENSURE(!serialized.IsOOB());
NDqProto::TSourcePushRequest data;
*data.MutableData() = std::move(serialized.Proto);
@@ -740,7 +740,7 @@ public:
auto inputType = GetInputType();
TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0);
TDqSerializedBatch serialized = dataSerializer.Serialize(batch, inputType);
- PushString(std::move(serialized), space);
+ Push(std::move(serialized), space);
}
[[nodiscard]]
@@ -1027,7 +1027,7 @@ private:
TDqOutputChannelStats PopStats;
};
-class TDqSink : public IStringSink {
+class TDqSink : public IDqAsyncOutputBuffer {
public:
TDqSink(ui64 taskId, ui64 outputIndex, TType* type, IPipeTaskRunner* taskRunner)
: TaskId(taskId)
@@ -1051,7 +1051,7 @@ public:
return PopStats;
}
- ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) override {
+ ui64 Pop(NDq::TDqSerializedBatch& batch, ui64 bytes) override {
try {
NDqProto::TCommandHeader header;
header.SetVersion(5);
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
index 5739e96dec..1883d3a115 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h
@@ -12,18 +12,6 @@ extern const TString WorkingDirectoryParamName;
extern const TString WorkingDirectoryDontInitParamName; // COMPAT(aozeritsky)
extern const TString UseMetaParamName; // COMPAT(aozeritsky)
-class IStringSource: public NDq::IDqAsyncInputBuffer {
-public:
- virtual ~IStringSource() = default;
- virtual void PushString(NDq::TDqSerializedBatch&& batch, i64 space) = 0;
-};
-
-class IStringSink: public NDq::IDqAsyncOutputBuffer {
-public:
- virtual ~IStringSink() = default;
- virtual ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) = 0;
-};
-
class IInputChannel : public TThrRefBase, private TNonCopyable {
public:
using TPtr = TIntrusivePtr<IInputChannel>;
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index d1fd247dda..00436a3065 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -267,7 +267,7 @@ private:
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
auto source = taskRunner->GetSource(index);
- (static_cast<NTaskRunnerProxy::IStringSource*>(source.Get()))->PushString(std::move(serialized), space);
+ source->Push(std::move(serialized), space);
if (finish) {
source->Finish();
}
@@ -397,7 +397,7 @@ private:
i64 size = 0;
i64 checkpointSize = 0;
if (ev->Get()->Size > 0) {
- size = (static_cast<NTaskRunnerProxy::IStringSink*>(sink.Get()))->PopString(batch, ev->Get()->Size);
+ size = sink->Pop(batch, ev->Get()->Size);
}
bool hasCheckpoint = sink->Pop(checkpoint);
if (hasCheckpoint) {