aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-01-25 14:27:01 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-01-25 14:27:01 +0300
commit5c1d7760d1cf7316ec8683a71995dfa40dc1c88a (patch)
tree91c85ddbdd40780a27d7b4105158f2a3530cb547
parent63a2b79b779ce3da57d43544cd3d4dcd03a91a72 (diff)
downloadydb-5c1d7760d1cf7316ec8683a71995dfa40dc1c88a.tar.gz
One event TEvContinueRun instead of queue
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp74
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h2
2 files changed, 48 insertions, 28 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index aeaa4c4650..aa34d77470 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -275,7 +275,7 @@ private:
return true; // handled channels syncronously
}
CA_LOG_D("DoHandleChannelsAfterFinishImpl");
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ true));
+ AskContinueRun(std::move(req), /* checkpointOnly = */ true);
return false;
}
@@ -369,7 +369,7 @@ private:
TBase::PassAway();
}
- TMaybe<NTaskRunnerActor::TWatermarkRequest> GetWatermarkRequest() {
+ TMaybe<TInstant> GetWatermarkRequest() {
if (!WatermarksTracker.HasPendingWatermark()) {
return Nothing();
}
@@ -380,27 +380,17 @@ private:
return Nothing();
}
- TVector<ui32> channelIds;
- for (const auto& [channelId, info] : OutputChannelsMap) {
- if (info.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
- continue;
- }
-
- channelIds.emplace_back(channelId);
- }
-
DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark);
- return TMaybe<NTaskRunnerActor::TWatermarkRequest>({std::move(channelIds), pendingWatermark});
+ return pendingWatermark;
}
- TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() {
- TMaybe<NTaskRunnerActor::TCheckpointRequest> req = Nothing();
+ TMaybe<NDqProto::TCheckpoint> GetCheckpointRequest() {
if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) {
CheckpointRequestedFromTaskRunner = true;
- req = {GetIds(OutputChannelsMap), GetIds(SinksMap), Checkpoints->GetPendingCheckpoint()};
+ return Checkpoints->GetPendingCheckpoint();
}
- return req;
+ return Nothing();
}
void DoExecuteImpl() override {
@@ -410,7 +400,7 @@ private:
if (ProcessSourcesState.Inflight == 0) {
auto req = GetCheckpointRequest();
CA_LOG_T("DoExecuteImpl: " << (bool) req);
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ false));
+ AskContinueRun(std::move(req), /* checkpointOnly = */ false);
}
}
@@ -557,7 +547,7 @@ private:
ProcessSourcesState.Inflight--;
if (ProcessSourcesState.Inflight == 0) {
CA_LOG_T("Send TEvContinueRun on OnAsyncInputPushFinished");
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), Nothing(), false));
+ AskContinueRun(Nothing(), false);
}
}
@@ -831,27 +821,54 @@ private:
}
}
- void AskContinueRun(std::unique_ptr<NTaskRunnerActor::TEvContinueRun> continueRunEvent) {
- continueRunEvent->SinkIds = GetIds(SinksMap);
- continueRunEvent->InputTransformIds = GetIds(InputTransformsMap);
+ void AskContinueRun(TMaybe<NDqProto::TCheckpoint>&& checkpointRequest, bool checkpointOnly) {
+ Y_VERIFY(!checkpointOnly || !ContinueRunEvent);
+ if (!ContinueRunEvent) {
+ ContinueRunStartWaitTime = TInstant::Now();
+ ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>();
+ ContinueRunEvent->SinkIds = GetIds(SinksMap);
+ ContinueRunEvent->InputTransformIds = GetIds(InputTransformsMap);
+ }
+ ContinueRunEvent->CheckpointOnly = checkpointOnly;
+ if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) {
+ if (!ContinueRunEvent->WatermarkRequest) {
+ ContinueRunEvent->WatermarkRequest.ConstructInPlace();
+ ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest;
+
+ ContinueRunEvent->WatermarkRequest->ChannelIds.reserve(OutputChannelsMap.size());
+ for (const auto& [channelId, info] : OutputChannelsMap) {
+ if (info.WatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
+ ContinueRunEvent->WatermarkRequest->ChannelIds.emplace_back(channelId);
+ }
+ }
+ } else {
+ ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest);
+ }
+ }
+ if (checkpointRequest) {
+ if (!ContinueRunEvent->CheckpointRequest) {
+ ContinueRunEvent->CheckpointRequest.ConstructInPlace(GetIds(OutputChannelsMap), GetIds(SinksMap), *checkpointRequest);
+ } else {
+ Y_VERIFY(ContinueRunEvent->CheckpointRequest->Checkpoint.GetGeneration() == checkpointRequest->GetGeneration());
+ Y_VERIFY(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId());
+ }
+ }
if (!UseCpuQuota()) {
- Send(TaskRunnerActorId, continueRunEvent.release());
+ Send(TaskRunnerActorId, ContinueRunEvent.release());
return;
}
- ContinueRunEvents.emplace_back(std::move(continueRunEvent), TInstant::Now());
ProcessContinueRun();
}
void ProcessContinueRun() {
- if (!ContinueRunEvents.empty() && !CpuTimeQuotaAsked && !ContinueRunInflight) {
- Send(TaskRunnerActorId, ContinueRunEvents.front().first.release());
+ if (ContinueRunEvent && !CpuTimeQuotaAsked && !ContinueRunInflight) {
+ Send(TaskRunnerActorId, ContinueRunEvent.release());
if (CpuTimeQuotaWaitDelay) {
- const TDuration quotaWaitDelay = TInstant::Now() - ContinueRunEvents.front().second;
+ const TDuration quotaWaitDelay = TInstant::Now() - ContinueRunStartWaitTime;
CpuTimeQuotaWaitDelay->Collect(quotaWaitDelay.MilliSeconds());
}
- ContinueRunEvents.pop_front();
ContinueRunInflight = true;
}
}
@@ -918,7 +935,8 @@ private:
TActorId QuoterServiceActorId;
TInstant CpuTimeQuotaAsked;
TDuration CpuTimeSpent;
- std::deque<std::pair<std::unique_ptr<NTaskRunnerActor::TEvContinueRun>, TInstant>> ContinueRunEvents;
+ std::unique_ptr<NTaskRunnerActor::TEvContinueRun> ContinueRunEvent;
+ TInstant ContinueRunStartWaitTime;
bool ContinueRunInflight = false;
NMonitoring::THistogramPtr CpuTimeGetQuotaLatency;
NMonitoring::THistogramPtr CpuTimeQuotaWaitDelay;
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 5d4ca7c182..79bb8ed258 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -284,6 +284,8 @@ struct TEvChannelPopFinished
};
struct TWatermarkRequest {
+ TWatermarkRequest() = default;
+
TWatermarkRequest(TVector<ui32>&& channelIds, TInstant watermark)
: ChannelIds(std::move(channelIds))
, Watermark(watermark) {