diff options
author | aozeritsky <aozeritsky@ydb.tech> | 2023-11-10 15:09:41 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@ydb.tech> | 2023-11-10 17:20:17 +0300 |
commit | c540d4e186b249a7d218d745b29d42731913f2e7 (patch) | |
tree | 7c18c74da6f0d86059c5604e10116726ee807dda | |
parent | dbbda4251f0a9a6a01b3942d9094fc2ac49d7b71 (diff) | |
download | ydb-c540d4e186b249a7d218d745b29d42731913f2e7.tar.gz |
Add Push/Pop with SerializedBatch to interface
7 files changed, 22 insertions, 20 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp index 5acfc3e3d1..7d515e5cb3 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp @@ -37,6 +37,10 @@ public: AddBatch(std::move(batch), space); } } + + virtual void Push(TDqSerializedBatch&&, i64) override { + YQL_ENSURE(!"Unimplemented"); + } }; IDqAsyncInputBuffer::TPtr CreateDqAsyncInputBuffer( diff --git a/ydb/library/yql/dq/runtime/dq_async_input.h b/ydb/library/yql/dq/runtime/dq_async_input.h index 3ef75dec31..939f278709 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.h +++ b/ydb/library/yql/dq/runtime/dq_async_input.h @@ -1,5 +1,6 @@ #pragma once #include "dq_input.h" +#include "dq_transport.h" namespace NYql::NDq { @@ -19,7 +20,8 @@ public: virtual const TDqAsyncInputBufferStats& GetPushStats() const = 0; virtual void Push(NKikimr::NMiniKQL::TUnboxedValueBatch&& batch, i64 space) = 0; - + virtual void Push(TDqSerializedBatch&& batch, i64 space) = 0; + virtual void Finish() = 0; }; diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp index e2bbf1f089..5a617d71ca 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp @@ -135,6 +135,11 @@ public: return usedBytes; } + virtual ui64 Pop(TDqSerializedBatch&, ui64) override { + YQL_ENSURE(!"Unimplemented"); + return 0; + } + bool Pop(NDqProto::TWatermark& watermark) override { if (!Values.empty() && std::holds_alternative<NDqProto::TWatermark>(Values.front().Value)) { watermark = std::move(std::get<NDqProto::TWatermark>(Values.front().Value)); diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h index 0375b6c9eb..6f044124d0 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.h +++ b/ydb/library/yql/dq/runtime/dq_async_output.h @@ -1,5 +1,6 @@ #pragma once #include "dq_output.h" +#include "dq_transport.h" #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> @@ -24,6 +25,8 @@ public: // Pop data to send. Return estimated size of returned data. [[nodiscard]] virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, ui64 bytes) = 0; + [[nodiscard]] + virtual ui64 Pop(TDqSerializedBatch&, ui64 bytes) = 0; // Pop watermark [[nodiscard]] virtual bool Pop(NDqProto::TWatermark& watermark) = 0; diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 8f03119808..73e25ac49c 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -670,7 +670,7 @@ private: TDqInputStats PopStats; }; -class TDqSource: public IStringSource { +class TDqSource: public IDqAsyncInputBuffer { public: TDqSource(ui64 taskId, ui64 inputIndex, TType* inputType, IPipeTaskRunner* taskRunner) : TaskId(taskId) @@ -721,7 +721,7 @@ public: ythrow yexception() << "unimplemented"; } - void PushString(NDq::TDqSerializedBatch&& serialized, i64 space) override { + void Push(NDq::TDqSerializedBatch&& serialized, i64 space) override { YQL_ENSURE(!serialized.IsOOB()); NDqProto::TSourcePushRequest data; *data.MutableData() = std::move(serialized.Proto); @@ -740,7 +740,7 @@ public: auto inputType = GetInputType(); TDqDataSerializer dataSerializer(TaskRunner->GetTypeEnv(), TaskRunner->GetHolderFactory(), NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0); TDqSerializedBatch serialized = dataSerializer.Serialize(batch, inputType); - PushString(std::move(serialized), space); + Push(std::move(serialized), space); } [[nodiscard]] @@ -1027,7 +1027,7 @@ private: TDqOutputChannelStats PopStats; }; -class TDqSink : public IStringSink { +class TDqSink : public IDqAsyncOutputBuffer { public: TDqSink(ui64 taskId, ui64 outputIndex, TType* type, IPipeTaskRunner* taskRunner) : TaskId(taskId) @@ -1051,7 +1051,7 @@ public: return PopStats; } - ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) override { + ui64 Pop(NDq::TDqSerializedBatch& batch, ui64 bytes) override { try { NDqProto::TCommandHeader header; header.SetVersion(5); diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h index 5739e96dec..1883d3a115 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h @@ -12,18 +12,6 @@ extern const TString WorkingDirectoryParamName; extern const TString WorkingDirectoryDontInitParamName; // COMPAT(aozeritsky) extern const TString UseMetaParamName; // COMPAT(aozeritsky) -class IStringSource: public NDq::IDqAsyncInputBuffer { -public: - virtual ~IStringSource() = default; - virtual void PushString(NDq::TDqSerializedBatch&& batch, i64 space) = 0; -}; - -class IStringSink: public NDq::IDqAsyncOutputBuffer { -public: - virtual ~IStringSink() = default; - virtual ui64 PopString(NDq::TDqSerializedBatch& batch, ui64 bytes) = 0; -}; - class IInputChannel : public TThrRefBase, private TNonCopyable { public: using TPtr = TIntrusivePtr<IInputChannel>; diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index d1fd247dda..00436a3065 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -267,7 +267,7 @@ private: try { // auto guard = taskRunner->BindAllocator(); // only for local mode auto source = taskRunner->GetSource(index); - (static_cast<NTaskRunnerProxy::IStringSource*>(source.Get()))->PushString(std::move(serialized), space); + source->Push(std::move(serialized), space); if (finish) { source->Finish(); } @@ -397,7 +397,7 @@ private: i64 size = 0; i64 checkpointSize = 0; if (ev->Get()->Size > 0) { - size = (static_cast<NTaskRunnerProxy::IStringSink*>(sink.Get()))->PopString(batch, ev->Get()->Size); + size = sink->Pop(batch, ev->Get()->Size); } bool hasCheckpoint = sink->Pop(checkpoint); if (hasCheckpoint) { |