aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-02-08 15:41:56 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-02-08 15:41:56 +0300
commit1fafdf10d7f7a82fc6440f9358df4a144af23596 (patch)
tree405ed11f9f5db4e93107e9383822b61b6a619a25
parent2efaaefec2cb2a55d55c01753d1eed2a3296adb5 (diff)
downloadydb-1fafdf10d7f7a82fc6440f9358df4a144af23596.tar.gz
DQ21: Async compute actor
ref:dae4df4bb40098f8244a252d8e71d4474c719a37
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp416
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h27
-rw-r--r--ydb/library/yql/dq/actors/compute/ya.make1
-rw-r--r--ydb/library/yql/providers/dq/actors/compute_actor.cpp14
4 files changed, 454 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
new file mode 100644
index 0000000000..6f5f46abe6
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -0,0 +1,416 @@
+#include "dq_compute_actor.h"
+#include "dq_async_compute_actor.h"
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
+
+#include <ydb/library/yql/dq/common/dq_common.h>
+
+namespace NYql {
+namespace NDq {
+
+using namespace NActors;
+
+namespace {
+
+bool IsDebugLogEnabled(const TActorSystem* actorSystem) {
+ auto* settings = actorSystem->LoggerSettings();
+ return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, NKikimrServices::KQP_TASKS_RUNNER);
+}
+
+} // anonymous namespace
+
+class TDqAsyncComputeActor : public TDqComputeActorBase<TDqAsyncComputeActor>
+ , public NTaskRunnerActor::ITaskRunnerActor::ICallbacks
+{
+
+ using TBase = TDqComputeActorBase<TDqAsyncComputeActor>;
+
+public:
+ static constexpr char ActorName[] = "DQ_COMPUTE_ACTOR";
+
+ TDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
+ : TBase(executerId, txId, std::move(task), std::move(sourceActorFactory), std::move(sinkActorFactory), settings, memoryLimits)
+ , TaskRunnerActorFactory(taskRunnerActorFactory)
+ {}
+
+ void DoBootstrap() {
+ const TActorSystem* actorSystem = TlsActivationContext->ActorSystem();
+
+ TLogFunc logger;
+ if (IsDebugLogEnabled(actorSystem)) {
+ logger = [actorSystem, txId = this->GetTxId(), taskId = GetTask().GetId()] (const TString& message) {
+ LOG_DEBUG_S(*actorSystem, NKikimrServices::KQP_TASKS_RUNNER, "TxId: " << txId
+ << ", task: " << taskId << ": " << message);
+ };
+ }
+
+ NActors::IActor* actor;
+ std::tie(TaskRunnerActor, actor) = TaskRunnerActorFactory->Create(
+ this, TStringBuilder() << this->GetTxId());
+ TaskRunnerActorId = this->RegisterWithSameMailbox(actor);
+
+ TDqTaskRunnerMemoryLimits limits;
+ limits.ChannelBufferSize = MemoryLimits.ChannelBufferSize;
+ limits.OutputChunkMaxSize = GetDqExecutionSettings().FlowControl.MaxOutputChunkSize;
+
+ this->Become(&TDqAsyncComputeActor::StateFuncBase);
+
+ // TODO:
+ std::shared_ptr<IDqTaskRunnerExecutionContext> execCtx = std::shared_ptr<IDqTaskRunnerExecutionContext>(new TDqTaskRunnerExecutionContext());
+
+ this->Send(TaskRunnerActorId,
+ new NTaskRunnerActor::TEvTaskRunnerCreate(
+ GetTask(), limits, execCtx));
+ }
+
+ void FillExtraStats(NDqProto::TDqComputeActorStats* /* dst */, bool /* last */) {
+ }
+
+private:
+ STFUNC(StateFuncBase) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NTaskRunnerActor::TEvTaskRunFinished, OnRunFinished);
+ HFunc(NTaskRunnerActor::TEvSourcePushFinished, OnSourcePushFinished);
+ HFunc(NTaskRunnerActor::TEvChannelPopFinished, OnPopFinished);
+ HFunc(NTaskRunnerActor::TEvTaskRunnerCreateFinished, OnTaskRunnerCreatead);
+ HFunc(NTaskRunnerActor::TEvContinueRun, OnContinueRun); // push finished
+ default:
+ TDqComputeActorBase<TDqAsyncComputeActor>::StateFuncBase(ev, ctx);
+ };
+ };
+
+ void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) override {
+ YQL_ENSURE(!outputChannel.Finished || Checkpoints);
+
+ if (outputChannel.PopStarted) {
+ return;
+ }
+
+ const bool wasFinished = outputChannel.Finished;
+ auto channelId = outputChannel.ChannelId;
+
+ const ui32 allowedOvercommit = AllowedChannelsOvercommit();
+
+ const i64 toSend = peerState.PeerFreeSpace + allowedOvercommit - peerState.InFlightBytes;
+
+ CA_LOG_D("About to drain channelId: " << channelId
+ << ", hasPeer: " << outputChannel.HasPeer
+ << ", peerFreeSpace: " << peerState.PeerFreeSpace
+ << ", inFlightBytes: " << peerState.InFlightBytes
+ << ", inFlightRows: " << peerState.InFlightRows
+ << ", inFlightCount: " << peerState.InFlightCount
+ << ", allowedOvercommit: " << allowedOvercommit
+ << ", toSend: " << toSend
+// << ", finished: " << outputChannel.Channel->IsFinished());
+ );
+
+ outputChannel.PopStarted = true;
+ ProcessOutputsState.Inflight ++;
+ if (toSend <= 0) {
+ if (Y_UNLIKELY(outputChannel.Stats)) {
+ outputChannel.Stats->BlockedByCapacity++;
+ }
+ this->Send(this->SelfId(), new NTaskRunnerActor::TEvChannelPopFinished(channelId));
+ return;
+ }
+
+ this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvPop(channelId, wasFinished, toSend));
+ }
+
+ void DrainSink(ui64 outputIndex, TSinkInfo& sinkInfo) override {
+ if (sinkInfo.Finished && !Checkpoints) {
+ return;
+ }
+
+ if (sinkInfo.PopStarted) {
+ return;
+ }
+
+ Y_VERIFY(sinkInfo.SinkActor);
+ Y_VERIFY(sinkInfo.Actor);
+
+ const ui32 allowedOvercommit = AllowedChannelsOvercommit();
+ const i64 sinkActorFreeSpaceBeforeSend = sinkInfo.SinkActor->GetFreeSpace();
+
+ i64 toSend = sinkActorFreeSpaceBeforeSend + allowedOvercommit;
+ CA_LOG_D("About to drain sink " << outputIndex
+ << ". FreeSpace: " << sinkActorFreeSpaceBeforeSend
+ << ", allowedOvercommit: " << allowedOvercommit
+ << ", toSend: " << toSend
+ //<< ", finished: " << sinkInfo.Sink->IsFinished());
+ );
+
+ sinkInfo.PopStarted = true;
+ ProcessOutputsState.Inflight ++;
+ sinkInfo.SinkActorFreeSpaceBeforeSend = sinkActorFreeSpaceBeforeSend;
+ }
+
+ void SourcePush(NKikimr::NMiniKQL::TUnboxedValueVector&& batch, TSourceInfo& source, i64 space, bool finished) override {
+ if (space <= 0) {
+ return;
+ }
+
+ ProcessSourcesState.Inflight ++;
+ source.PushStarted = true;
+ source.Finished = finished;
+ TaskRunnerActor->SourcePush(Cookie++, source.Index, std::move(batch), space, finished);
+ }
+
+ void TakeInputChannelData(NDqProto::TChannelData&& channelData, bool ack) override {
+ TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(channelData.GetChannelId());
+ YQL_ENSURE(inputChannel, "task: " << Task.GetId() << ", unknown input channelId: " << channelData.GetChannelId());
+
+ auto finished = channelData.GetFinished();
+
+ auto ev = (channelData.GetData().GetRows())
+ ? MakeHolder<NTaskRunnerActor::TEvPush>(
+ channelData.GetChannelId(),
+ std::move(*channelData.MutableData()),
+ finished,
+ /*askFreeSpace = */ true)
+ : MakeHolder<NTaskRunnerActor::TEvPush>(
+ channelData.GetChannelId(), finished, /*askFreeSpace = */ true);
+
+ this->Send(TaskRunnerActorId, ev.Release(), 0, Cookie);
+
+ if (channelData.HasCheckpoint()) {
+ Y_VERIFY(inputChannel->CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
+ Y_VERIFY(Checkpoints);
+ const auto& checkpoint = channelData.GetCheckpoint();
+ inputChannel->Pause(checkpoint);
+ Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId());
+ }
+
+ TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId()};
+ }
+
+ void PassAway() override {
+ if (TaskRunnerActor) {
+ TaskRunnerActor->PassAway();
+ }
+ NActors::IActor::PassAway();
+ }
+
+ void DoExecuteImpl() override {
+ PollSourceActors();
+ if (ProcessSourcesState.Inflight == 0) {
+ this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvContinueRun());
+ }
+ }
+
+ bool SayHelloOnBootstrap() override {
+ return false;
+ }
+
+ i64 GetInputChannelFreeSpace(ui64 channelId) const override {
+ const TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(channelId);
+ YQL_ENSURE(inputChannel, "task: " << Task.GetId() << ", unknown input channelId: " << channelId);
+
+ return inputChannel->FreeSpace;
+ }
+
+ i64 SourceFreeSpace(TSourceInfo& source) override {
+ return source.FreeSpace;
+ }
+
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator() override {
+ return TypeEnv->BindAllocator();
+ }
+
+ std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> MaybeBindAllocator() override {
+ std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard;
+ if (TypeEnv) {
+ guard.emplace(TypeEnv->BindAllocator());
+ }
+ return guard;
+ }
+
+ void OnTaskRunnerCreatead(NTaskRunnerActor::TEvTaskRunnerCreateFinished::TPtr& ev, const NActors::TActorContext& ) {
+ const auto& secureParams = ev->Get()->SecureParams;
+ const auto& taskParams = ev->Get()->TaskParams;
+ const auto& typeEnv = ev->Get()->TypeEnv;
+ const auto& holderFactory = ev->Get()->HolderFactory;
+
+ TypeEnv = const_cast<NKikimr::NMiniKQL::TTypeEnvironment*>(&typeEnv);
+ FillChannelMaps(holderFactory, typeEnv, secureParams, taskParams);
+
+ {
+ // say "Hello" to executer
+ auto ev = MakeHolder<TEvDqCompute::TEvState>();
+ ev->Record.SetState(NDqProto::COMPUTE_STATE_EXECUTING);
+ ev->Record.SetTaskId(Task.GetId());
+
+ this->Send(ExecuterId, ev.Release(), NActors::IEventHandle::FlagTrackDelivery);
+ }
+
+ ContinueExecute();
+ }
+
+ void OnRunFinished(NTaskRunnerActor::TEvTaskRunFinished::TPtr& ev, const NActors::TActorContext& ) {
+ auto sourcesState = GetSourcesState();
+ auto status = ev->Get()->RunStatus;
+
+ CA_LOG_D("Resume execution, run status: " << status);
+
+ for (const auto& [channelId, freeSpace] : ev->Get()->InputChannelFreeSpace) {
+ auto it = InputChannelsMap.find(channelId);
+ if (it != InputChannelsMap.end()) {
+ it->second.FreeSpace = freeSpace;
+ }
+ }
+
+ if (status != ERunStatus::Finished) {
+ PollSources(std::move(sourcesState));
+ }
+
+ if ((status == ERunStatus::PendingInput || status == ERunStatus::Finished) && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved() && ReadyToCheckpoint()) {
+ Checkpoints->DoCheckpoint();
+ }
+
+ ProcessOutputsImpl(status);
+ }
+
+ void OnSourcePushFinished(NTaskRunnerActor::TEvSourcePushFinished::TPtr& ev, const NActors::TActorContext& ) {
+ auto it = SourcesMap.find(ev->Get()->Index);
+ Y_VERIFY(it != SourcesMap.end());
+ auto& source = it->second;
+ source.PushStarted = false;
+ // source.FreeSpace = ev->Get()->FreeSpace; TODO:XXX get freespace on run
+ ProcessSourcesState.Inflight--;
+ if (ProcessSourcesState.Inflight == 0) {
+ this->Send(TaskRunnerActorId, new NTaskRunnerActor::TEvContinueRun());
+ }
+ }
+
+ void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) {
+ auto channelId = ev->Get()->ChannelId;
+ auto finished = ev->Get()->Finished;
+ auto dataWasSent = ev->Get()->Changed;
+ auto it = OutputChannelsMap.find(channelId);
+ Y_VERIFY(it != OutputChannelsMap.end());
+
+ TOutputChannelInfo& outputChannel = it->second;
+ outputChannel.Finished = finished;
+ if (finished) {
+ FinishedOutputChannels.insert(channelId);
+ }
+
+ outputChannel.PopStarted = false;
+ ProcessOutputsState.Inflight --;
+ ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
+
+ for (ui32 i = 0; i < ev->Get()->Data.size(); i++) {
+ auto& chunk = ev->Get()->Data[i];
+ NDqProto::TChannelData channelData;
+ channelData.SetChannelId(channelId);
+ // set finished only for last chunk
+ channelData.SetFinished(finished && i==ev->Get()->Data.size()-1);
+ channelData.MutableData()->Swap(&chunk);
+ Channels->SendChannelData(std::move(channelData));
+ }
+ if (ev->Get()->Data.empty() && dataWasSent) {
+ NDqProto::TChannelData channelData;
+ channelData.SetChannelId(channelId);
+ channelData.SetFinished(finished);
+ Channels->SendChannelData(std::move(channelData));
+ }
+
+ ProcessOutputsState.DataWasSent |= dataWasSent;
+
+ ProcessOutputsState.AllOutputsFinished =
+ FinishedOutputChannels.size() == OutputChannelsMap.size() &&
+ FinishedSinks.size() == SinksMap.size();
+
+ CheckRunStatus();
+ }
+
+ void OnContinueRun(NTaskRunnerActor::TEvContinueRun::TPtr& ev, const NActors::TActorContext& ) {
+ auto it = TakeInputChannelDataRequests.find(ev->Cookie);
+ YQL_ENSURE(it != TakeInputChannelDataRequests.end());
+
+ if (it->second.Ack) {
+ TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId);
+ Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace);
+ }
+
+ ResumeExecution();
+ }
+
+ void SinkSend(ui64 index,
+ NKikimr::NMiniKQL::TUnboxedValueVector&& batch,
+ TMaybe<NDqProto::TCheckpoint>&& checkpoint,
+ i64 size,
+ i64 checkpointSize,
+ bool finished,
+ bool changed) override
+ {
+ auto outputIndex = index;
+ auto dataWasSent = finished || changed;
+ auto dataSize = size;
+ auto it = SinksMap.find(outputIndex);
+ Y_VERIFY(it != SinksMap.end());
+
+ TSinkInfo& sinkInfo = it->second;
+ sinkInfo.Finished = finished;
+ if (finished) {
+ FinishedSinks.insert(outputIndex);
+ }
+ if (checkpoint) {
+ CA_LOG_I("Resume inputs");
+ ResumeInputs();
+ }
+
+ sinkInfo.PopStarted = false;
+ ProcessOutputsState.Inflight --;
+ ProcessOutputsState.HasDataToSend |= !sinkInfo.Finished;
+
+ auto guard = BindAllocator();
+ sinkInfo.SinkActor->SendData(std::move(batch), size, std::move(checkpoint), finished);
+ CA_LOG_D("sink " << outputIndex << ": sent " << dataSize << " bytes of data and " << checkpointSize << " bytes of checkpoint barrier");
+
+ CA_LOG_D("Drain sink " << outputIndex
+ << ". Free space decreased: " << (sinkInfo.SinkActorFreeSpaceBeforeSend - sinkInfo.SinkActor->GetFreeSpace())
+ << ", sent data from buffer: " << dataSize);
+
+ ProcessOutputsState.DataWasSent |= dataWasSent;
+ ProcessOutputsState.AllOutputsFinished =
+ FinishedOutputChannels.size() == OutputChannelsMap.size() &&
+ FinishedSinks.size() == SinksMap.size();
+ CheckRunStatus();
+ }
+
+ NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv;
+ NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor;
+ NActors::TActorId TaskRunnerActorId;
+ NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
+
+ THashSet<ui64> FinishedOutputChannels;
+ THashSet<ui64> FinishedSinks;
+ struct TProcessSourcesState {
+ int Inflight = 0;
+ };
+ TProcessSourcesState ProcessSourcesState;
+
+ struct TTakeInputChannelData {
+ bool Ack;
+ ui64 ChannelId;
+ };
+ THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests;
+ ui64 Cookie = 0;
+};
+
+
+IActor* CreateDqAsyncComputeActor(const TActorId& executerId, const TTxId& txId, NYql::NDqProto::TDqTask&& task,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory)
+{
+ return new TDqAsyncComputeActor(executerId, txId, std::move(task), std::move(sourceActorFactory),
+ std::move(sinkActorFactory), settings, memoryLimits, taskRunnerActorFactory);
+}
+
+} // namespace NDq
+} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
new file mode 100644
index 0000000000..71c918f04c
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_sinks.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/library/yql/dq/actors/dq_events_ids.h>
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
+#include <ydb/library/yql/dq/actors/task_runner/task_runner_actor.h>
+#include <ydb/library/yql/dq/common/dq_common.h>
+#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
+#include <ydb/library/yql/dq/runtime/dq_tasks_runner.h>
+#include <ydb/library/yql/dq/runtime/dq_transport.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/log.h>
+
+namespace NYql {
+namespace NDq {
+
+NActors::IActor* CreateDqAsyncComputeActor(const NActors::TActorId& executerId, const TTxId& txId, NDqProto::TDqTask&& task,
+ IDqSourceActorFactory::TPtr sourceActorFactory, IDqSinkActorFactory::TPtr sinkActorFactory,
+ const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
+ const NTaskRunnerActor::ITaskRunnerActorFactory::TPtr& taskRunnerActorFactory);
+
+} // namespace NDq
+} // namespace NYql
diff --git a/ydb/library/yql/dq/actors/compute/ya.make b/ydb/library/yql/dq/actors/compute/ya.make
index 7f89fb7504..c4b20bd502 100644
--- a/ydb/library/yql/dq/actors/compute/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ya.make
@@ -7,6 +7,7 @@ OWNER(
SRCS(
dq_compute_actor.cpp
+ dq_async_compute_actor.cpp
dq_compute_actor_channels.cpp
dq_compute_actor_checkpoints.cpp
dq_compute_actor_io_actors_factory.cpp
diff --git a/ydb/library/yql/providers/dq/actors/compute_actor.cpp b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
index 41cc21e6eb..a2ae923d16 100644
--- a/ydb/library/yql/providers/dq/actors/compute_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/compute_actor.cpp
@@ -1,4 +1,5 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/library/yql/dq/actors/compute/dq_async_compute_actor.h>
#include <ydb/library/yql/providers/dq/api/protos/service.pb.h>
#include <ydb/library/yql/providers/dq/task_runner/tasks_runner_proxy.h>
@@ -70,10 +71,15 @@ IActor* CreateComputeActor(
memoryLimits,
taskRunnerFactory);
} else {
- // TODO: XXX
- Y_UNUSED(taskRunnerActorFactory);
- Y_VERIFY(false);
- return nullptr;
+ return NYql::NDq::CreateDqAsyncComputeActor(
+ executerId,
+ operationId,
+ std::move(task),
+ std::move(options.SourceActorFactory),
+ std::move(options.SinkActorFactory),
+ computeRuntimeSettings,
+ memoryLimits,
+ taskRunnerActorFactory);
}
}