summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <[email protected]>2022-11-04 00:36:24 +0300
committerd-mokhnatkin <[email protected]>2022-11-04 00:36:24 +0300
commit6f61e21ce1ec50faaae313dd52b1d8578f7a924f (patch)
treece818601caf30b6559d02cf4db16c08d094bb8e5
parent8d062747c7d76338322f5d60d9b7713ec147233a (diff)
fix watermarks hunging
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp3
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp10
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h4
3 files changed, 13 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 2c2dda745ed..53991f8d6b7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -475,7 +475,8 @@ private:
auto sourcesState = GetSourcesState();
auto status = ev->Get()->RunStatus;
- CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState);
+ CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState
+ << " watermark injected: " << ev->Get()->WatermarkInjectedToOutputs);
for (const auto& [channelId, freeSpace] : ev->Get()->InputChannelFreeSpace) {
auto it = InputChannelsMap.find(channelId);
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index 123bc2783ae..2ab4508a4db 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -174,9 +174,13 @@ private:
THolder<NDqProto::TMiniKqlProgramState> mkqlProgramState;
if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) {
if (shouldHandleWatermark) {
+ const auto watermarkRequested = ev->Get()->WatermarkRequest->Watermark;
+ LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequested
+ << " to " << ev->Get()->WatermarkRequest->ChannelIds.size() << " outputs ");
+
for (const auto& channelId : ev->Get()->WatermarkRequest->ChannelIds) {
NDqProto::TWatermark watermark;
- watermark.SetTimestampUs(ev->Get()->WatermarkRequest->Watermark.MicroSeconds());
+ watermark.SetTimestampUs(watermarkRequested.MicroSeconds());
TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark));
}
@@ -347,6 +351,10 @@ private:
ResumeInputs();
break;
}
+
+ if (hasWatermark) {
+ break;
+ }
}
Send(
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index 2bd914ea2de..10b1b0805c3 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -28,8 +28,8 @@ struct TDqSettings {
static constexpr ui64 OutputChunkMaxSize = 4_MB;
static constexpr ui64 ChunkSizeLimit = 128_MB;
static constexpr bool EnableDqReplicate = false;
- static constexpr bool WatermarksGranularityMs = 1000;
- static constexpr bool WatermarksLateArrivalDelayMs = 5000;
+ static constexpr ui64 WatermarksGranularityMs = 1000;
+ static constexpr ui64 WatermarksLateArrivalDelayMs = 5000;
static constexpr ui64 ParallelOperationsLimit = 16;
};