diff options
| author | d-mokhnatkin <[email protected]> | 2022-11-04 00:36:24 +0300 |
|---|---|---|
| committer | d-mokhnatkin <[email protected]> | 2022-11-04 00:36:24 +0300 |
| commit | 6f61e21ce1ec50faaae313dd52b1d8578f7a924f (patch) | |
| tree | ce818601caf30b6559d02cf4db16c08d094bb8e5 | |
| parent | 8d062747c7d76338322f5d60d9b7713ec147233a (diff) | |
fix watermarks hunging
3 files changed, 13 insertions, 4 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 2c2dda745ed..53991f8d6b7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -475,7 +475,8 @@ private: auto sourcesState = GetSourcesState(); auto status = ev->Get()->RunStatus; - CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState); + CA_LOG_T("Resume execution, run status: " << status << " checkpoint: " << (bool) ev->Get()->ProgramState + << " watermark injected: " << ev->Get()->WatermarkInjectedToOutputs); for (const auto& [channelId, freeSpace] : ev->Get()->InputChannelFreeSpace) { auto it = InputChannelsMap.find(channelId); diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index 123bc2783ae..2ab4508a4db 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -174,9 +174,13 @@ private: THolder<NDqProto::TMiniKqlProgramState> mkqlProgramState; if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) { if (shouldHandleWatermark) { + const auto watermarkRequested = ev->Get()->WatermarkRequest->Watermark; + LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequested + << " to " << ev->Get()->WatermarkRequest->ChannelIds.size() << " outputs "); + for (const auto& channelId : ev->Get()->WatermarkRequest->ChannelIds) { NDqProto::TWatermark watermark; - watermark.SetTimestampUs(ev->Get()->WatermarkRequest->Watermark.MicroSeconds()); + watermark.SetTimestampUs(watermarkRequested.MicroSeconds()); TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark)); } @@ -347,6 +351,10 @@ private: ResumeInputs(); break; } + + if (hasWatermark) { + break; + } } Send( diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 2bd914ea2de..10b1b0805c3 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -28,8 +28,8 @@ struct TDqSettings { static constexpr ui64 OutputChunkMaxSize = 4_MB; static constexpr ui64 ChunkSizeLimit = 128_MB; static constexpr bool EnableDqReplicate = false; - static constexpr bool WatermarksGranularityMs = 1000; - static constexpr bool WatermarksLateArrivalDelayMs = 5000; + static constexpr ui64 WatermarksGranularityMs = 1000; + static constexpr ui64 WatermarksLateArrivalDelayMs = 5000; static constexpr ui64 ParallelOperationsLimit = 16; }; |
