diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-25 14:27:01 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-01-25 14:27:01 +0300 |
commit | 5c1d7760d1cf7316ec8683a71995dfa40dc1c88a (patch) | |
tree | 91c85ddbdd40780a27d7b4105158f2a3530cb547 | |
parent | 63a2b79b779ce3da57d43544cd3d4dcd03a91a72 (diff) | |
download | ydb-5c1d7760d1cf7316ec8683a71995dfa40dc1c88a.tar.gz |
One event TEvContinueRun instead of queue
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp | 74 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/task_runner/events.h | 2 |
2 files changed, 48 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index aeaa4c4650..aa34d77470 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -275,7 +275,7 @@ private: return true; // handled channels syncronously } CA_LOG_D("DoHandleChannelsAfterFinishImpl"); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ true)); + AskContinueRun(std::move(req), /* checkpointOnly = */ true); return false; } @@ -369,7 +369,7 @@ private: TBase::PassAway(); } - TMaybe<NTaskRunnerActor::TWatermarkRequest> GetWatermarkRequest() { + TMaybe<TInstant> GetWatermarkRequest() { if (!WatermarksTracker.HasPendingWatermark()) { return Nothing(); } @@ -380,27 +380,17 @@ private: return Nothing(); } - TVector<ui32> channelIds; - for (const auto& [channelId, info] : OutputChannelsMap) { - if (info.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { - continue; - } - - channelIds.emplace_back(channelId); - } - DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark); - return TMaybe<NTaskRunnerActor::TWatermarkRequest>({std::move(channelIds), pendingWatermark}); + return pendingWatermark; } - TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() { - TMaybe<NTaskRunnerActor::TCheckpointRequest> req = Nothing(); + TMaybe<NDqProto::TCheckpoint> GetCheckpointRequest() { if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) { CheckpointRequestedFromTaskRunner = true; - req = {GetIds(OutputChannelsMap), GetIds(SinksMap), Checkpoints->GetPendingCheckpoint()}; + return Checkpoints->GetPendingCheckpoint(); } - return req; + return Nothing(); } void DoExecuteImpl() override { @@ -410,7 +400,7 @@ private: if (ProcessSourcesState.Inflight == 0) { auto req = GetCheckpointRequest(); CA_LOG_T("DoExecuteImpl: " << (bool) req); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ false)); + AskContinueRun(std::move(req), /* checkpointOnly = */ false); } } @@ -557,7 +547,7 @@ private: ProcessSourcesState.Inflight--; if (ProcessSourcesState.Inflight == 0) { CA_LOG_T("Send TEvContinueRun on OnAsyncInputPushFinished"); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), Nothing(), false)); + AskContinueRun(Nothing(), false); } } @@ -831,27 +821,54 @@ private: } } - void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) { - continueRunEvent->SinkIds = GetIds(SinksMap); - continueRunEvent->InputTransformIds = GetIds(InputTransformsMap); + void AskContinueRun(TMaybe<NDqProto::TCheckpoint>&& checkpointRequest, bool checkpointOnly) { + Y_VERIFY(!checkpointOnly || !ContinueRunEvent); + if (!ContinueRunEvent) { + ContinueRunStartWaitTime = TInstant::Now(); + ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>(); + ContinueRunEvent->SinkIds = GetIds(SinksMap); + ContinueRunEvent->InputTransformIds = GetIds(InputTransformsMap); + } + ContinueRunEvent->CheckpointOnly = checkpointOnly; + if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) { + if (!ContinueRunEvent->WatermarkRequest) { + ContinueRunEvent->WatermarkRequest.ConstructInPlace(); + ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest; + + ContinueRunEvent->WatermarkRequest->ChannelIds.reserve(OutputChannelsMap.size()); + for (const auto& [channelId, info] : OutputChannelsMap) { + if (info.WatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { + ContinueRunEvent->WatermarkRequest->ChannelIds.emplace_back(channelId); + } + } + } else { + ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest); + } + } + if (checkpointRequest) { + if (!ContinueRunEvent->CheckpointRequest) { + ContinueRunEvent->CheckpointRequest.ConstructInPlace(GetIds(OutputChannelsMap), GetIds(SinksMap), *checkpointRequest); + } else { + Y_VERIFY(ContinueRunEvent->CheckpointRequest->Checkpoint.GetGeneration() == checkpointRequest->GetGeneration()); + Y_VERIFY(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId()); + } + } if (!UseCpuQuota()) { - Send(TaskRunnerActorId, continueRunEvent.release()); + Send(TaskRunnerActorId, ContinueRunEvent.release()); return; } - ContinueRunEvents.emplace_back(std::move(continueRunEvent), TInstant::Now()); ProcessContinueRun(); } void ProcessContinueRun() { - if (!ContinueRunEvents.empty() && !CpuTimeQuotaAsked && !ContinueRunInflight) { - Send(TaskRunnerActorId, ContinueRunEvents.front().first.release()); + if (ContinueRunEvent && !CpuTimeQuotaAsked && !ContinueRunInflight) { + Send(TaskRunnerActorId, ContinueRunEvent.release()); if (CpuTimeQuotaWaitDelay) { - const TDuration quotaWaitDelay = TInstant::Now() - ContinueRunEvents.front().second; + const TDuration quotaWaitDelay = TInstant::Now() - ContinueRunStartWaitTime; CpuTimeQuotaWaitDelay->Collect(quotaWaitDelay.MilliSeconds()); } - ContinueRunEvents.pop_front(); ContinueRunInflight = true; } } @@ -918,7 +935,8 @@ private: TActorId QuoterServiceActorId; TInstant CpuTimeQuotaAsked; TDuration CpuTimeSpent; - std::deque<std::pair<std::unique_ptr<NTaskRunnerActor::TEvContinueRun>, TInstant>> ContinueRunEvents; + std::unique_ptr<NTaskRunnerActor::TEvContinueRun> ContinueRunEvent; + TInstant ContinueRunStartWaitTime; bool ContinueRunInflight = false; NMonitoring::THistogramPtr CpuTimeGetQuotaLatency; NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay; diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 5d4ca7c182..79bb8ed258 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -284,6 +284,8 @@ struct TEvChannelPopFinished }; struct TWatermarkRequest { + TWatermarkRequest() = default; + TWatermarkRequest(TVector<ui32>&& channelIds, TInstant watermark) : ChannelIds(std::move(channelIds)) , Watermark(watermark) { |