diff options
author | yumkam <[email protected]> | 2025-08-27 18:42:19 +0300 |
---|---|---|
committer | GitHub <[email protected]> | 2025-08-27 18:42:19 +0300 |
commit | 4bf27a67141bbfcfe0d540eb525a2f7a42b3705b (patch) | |
tree | c35edc94f75a4aa172f46ee4a2924689cfb174ac | |
parent | 04d7c1c57c7bc770a2b7b4dd5e36a6b7a4349471 (diff) |
async ca: rework watermark handling and fix inputtransform and watermark interaction (#22263)oidc-1.2.6.18-devoidc-1.2.6.0-dev
18 files changed, 932 insertions, 404 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index debfebfa793..669ffb18a99 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -182,9 +182,7 @@ private: DUMP(ProcessOutputsState, LastPopReturnedNoData); html << "<h3>Watermarks</h3>"; - for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) { - html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />"; - } + DUMP(WatermarksTracker, GetPendingWatermark, ()); html << "<h3>CPU Quota</h3>"; html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "<br />"; @@ -251,7 +249,6 @@ private: DUMP(info, ChannelId); DUMP(info, SrcStageId); DUMP(info, HasPeer); - html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />"; html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />"; html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />"; html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />"; @@ -310,7 +307,6 @@ private: html << "DqInputBuffer.InputType: " << (buffer->GetInputType() ? buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />"; html << "DqInputBuffer.InputWidth: " << (buffer->GetInputWidth() ? ToString(*buffer->GetInputWidth()) : TString{"unknown"}) << "<br />"; html << "DqInputBuffer.IsFinished: " << buffer->IsFinished() << "<br />"; - html << "DqInputBuffer.IsPaused: " << buffer->IsPaused() << "<br />"; html << "DqInputBuffer.IsPending: " << buffer->IsPending() << "<br />"; const auto& popStats = buffer->GetPopStats(); @@ -525,7 +521,7 @@ private: } const bool wasFinished = outputChannel.Finished; - auto channelId = outputChannel.ChannelId; + const auto channelId = outputChannel.ChannelId; const auto& peerState = Channels->GetOutputChannelInFlightState(channelId); @@ -620,18 +616,6 @@ private: TMaybe<TInstant> watermark; if (channelData.HasWatermark()) { watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs()); - - const bool channelWatermarkChanged = WatermarksTracker.NotifyInChannelWatermarkReceived( - inputChannel->ChannelId, - *watermark - ); - - if (channelWatermarkChanged) { - CA_LOG_T("Pause input channel " << channelData.GetChannelId() << " bacause of watermark " << *watermark); - inputChannel->Pause(*watermark); - } - - WatermarkTakeInputChannelDataRequests[*watermark]++; } TDqSerializedBatch batch; @@ -647,7 +631,8 @@ private: channelData.GetChannelId(), batch.RowCount() ? std::optional{std::move(batch)} : std::nullopt, finished, - channelData.HasCheckpoint() + channelData.HasCheckpoint(), + watermark ); Send(TaskRunnerActorId, ev.Release(), 0, Cookie); @@ -681,22 +666,6 @@ private: TBase::PassAway(); } - TMaybe<TInstant> GetWatermarkRequest() { - if (!WatermarksTracker.HasPendingWatermark()) { - return Nothing(); - } - - const auto pendingWatermark = *WatermarksTracker.GetPendingWatermark(); - if (WatermarkTakeInputChannelDataRequests.contains(pendingWatermark)) { - // Not all precending to watermark input channels data has been injected - return Nothing(); - } - - MetricsReporter.ReportInjectedToTaskRunnerWatermark(pendingWatermark); - - return pendingWatermark; - } - TMaybe<NDqProto::TCheckpoint> GetCheckpointRequest() { if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) { CheckpointRequestedFromTaskRunner = true; @@ -806,9 +775,24 @@ private: } } - if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) { - ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark()); - WatermarksTracker.PopPendingWatermark(); + if (auto watermark = ev->Get()->WatermarkInjectedToOutputs) { + ResumeInputsByWatermark(*watermark); + if (WatermarksTracker.NotifyWatermarkWasSent(*watermark)) { + MetricsReporter.ReportInjectedToOutputsWatermark(*watermark); + WatermarksTracker.PopPendingWatermark(); + } + // sources or input channels was unpaused, trigger new poll + ResumeExecution(EResumeSource::CAWatermarkInject); + } + + for (auto inputChannelId : ev->Get()->FinishedInputsWithWatermarks) { + CA_LOG_T("Unregister watermarked input channel " << inputChannelId); + WatermarksTracker.UnregisterInputChannel(inputChannelId); + } + + for (auto sourceId : ev->Get()->FinishedSourcesWithWatermarks) { + CA_LOG_T("Unregister watermarked async input " << sourceId); + WatermarksTracker.UnregisterAsyncInput(sourceId); } ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState; @@ -927,18 +911,6 @@ private: ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty(); - if (asyncData.Watermark.Defined()) { - const auto watermark = TInstant::MicroSeconds(asyncData.Watermark->GetTimestampUs()); - const bool shouldResumeInputs = WatermarksTracker.NotifyOutputChannelWatermarkSent( - outputChannel.ChannelId, - watermark - ); - - if (shouldResumeInputs) { - ResumeInputsByWatermark(watermark); - } - } - if (!shouldSkipData) { if (asyncData.Checkpoint.Defined()) { ResumeInputsByCheckpoint(); @@ -985,28 +957,32 @@ private: } void OnInputChannelDataAck(NTaskRunnerActor::TEvInputChannelDataAck::TPtr& ev) { - auto it = TakeInputChannelDataRequests.find(ev->Cookie); + const auto it = TakeInputChannelDataRequests.find(ev->Cookie); YQL_ENSURE(it != TakeInputChannelDataRequests.end()); + const auto channelId = it->second.ChannelId; + const auto watermark = it->second.Watermark; + CA_LOG_T("Input data push finished. Cookie: " << ev->Cookie - << " Watermark: " << it->second.Watermark + << " Watermark: " << watermark << " Ack: " << it->second.Ack << " TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size() - << " WatermarkTakeInputChannelDataRequests: " << WatermarkTakeInputChannelDataRequests.size()); - - if (it->second.Watermark.Defined()) { - auto& ct = WatermarkTakeInputChannelDataRequests.at(*it->second.Watermark); - if (--ct == 0) { - WatermarkTakeInputChannelDataRequests.erase(*it->second.Watermark); - } - } + ); - TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId); + TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(channelId); Y_ABORT_UNLESS(inputChannel); + inputChannel->FreeSpace = ev->Get()->FreeSpace; + if (watermark) { + if (WatermarksTracker.NotifyInChannelWatermarkReceived( channelId, *watermark)) { + CA_LOG_T("Pause input channel " << channelId << " because of watermark"); + inputChannel->Pause(*watermark); // XXX does nothing in async CA + } + } + if (it->second.Ack) { - Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace); + Channels->SendChannelDataAck(channelId, inputChannel->FreeSpace); } TakeInputChannelDataRequests.erase(it); @@ -1155,14 +1131,6 @@ private: ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>(); } ContinueRunEvent->CheckpointOnly = checkpointOnly; - if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) { - if (!ContinueRunEvent->WatermarkRequest) { - ContinueRunEvent->WatermarkRequest.ConstructInPlace(); - ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest; - } else { - ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest); - } - } if (checkpointRequest) { if (!ContinueRunEvent->CheckpointRequest) { ContinueRunEvent->CheckpointRequest.ConstructInPlace(*checkpointRequest); @@ -1171,6 +1139,11 @@ private: Y_ABORT_UNLESS(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId()); } } + if (auto watermarkRequest = WatermarksTracker.GetPendingWatermark()) { + Y_ENSURE(*watermarkRequest >= ContinueRunEvent->WatermarkRequest); + ContinueRunEvent->WatermarkRequest = *watermarkRequest; + MetricsReporter.ReportInjectedToTaskRunnerWatermark(*watermarkRequest); + } if (!UseCpuQuota()) { Send(TaskRunnerActorId, ContinueRunEvent.release()); @@ -1240,9 +1213,6 @@ private: TMaybe<TInstant> Watermark; }; THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests; - // Watermark should be injected to task runner only after all precending data is injected - // This hash map will help to track the right moment - THashMap<TInstant, ui32> WatermarkTakeInputChannelDataRequests; ui64 Cookie = 0; NDq::TDqTaskRunnerStatsView TaskRunnerStats; bool ReadyToCheckpointFlag; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h index 22aa7e6f586..42bc5e8916b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h @@ -45,8 +45,9 @@ public: } void ResumeByWatermark(TInstant watermark) { - YQL_ENSURE(watermark == PendingWatermark); - PendingWatermark = Nothing(); + if (watermark >= PendingWatermark) { + PendingWatermark = Nothing(); + } } virtual i64 GetFreeSpace() const = 0; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 9f805d82591..037766a7fa3 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -55,6 +55,7 @@ enum class EResumeSource : ui32 { CADataSent, CAPendingOutput, CATaskRunnerCreated, + CAWatermarkInject, Last, }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index dbf4141f953..7618a3119cc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -764,6 +764,7 @@ protected: //TDqComputeActorCheckpoints::ICallbacks sourceInfo.ResumeByWatermark(watermark); } + // XXX Does nothing in async CA, not used (yet) in sync CA for (auto& [id, channelInfo] : InputChannelsMap) { if (channelInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { continue; @@ -839,7 +840,6 @@ protected: ui32 SrcStageId; IDqInputChannel::TPtr Channel; bool HasPeer = false; - std::queue<TInstant> PendingWatermarks; const NDqProto::EWatermarksMode WatermarksMode; std::optional<NDqProto::TCheckpoint> PendingCheckpoint; const NDqProto::ECheckpointingMode CheckpointingMode; @@ -860,13 +860,15 @@ protected: } bool IsPaused() const { - return !PendingWatermarks.empty() || PendingCheckpoint.has_value(); + return PendingCheckpoint.has_value(); } void Pause(TInstant watermark) { YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED); - PendingWatermarks.emplace(watermark); + if (Channel) { + Channel->AddWatermark(watermark); + } } void Pause(const NDqProto::TCheckpoint& checkpoint) { @@ -874,24 +876,20 @@ protected: YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED); PendingCheckpoint = checkpoint; if (Channel) { // async actor doesn't hold channels, so channel is paused in task runner actor - Channel->Pause(); + Channel->PauseByCheckpoint(); } } void ResumeByWatermark(TInstant watermark) { - while (!PendingWatermarks.empty() && PendingWatermarks.front() <= watermark) { - if (PendingWatermarks.front() != watermark) { - CA_LOG_W("Input channel " << ChannelId << - " watermarks were collapsed. See YQ-1441. Dropped watermark: " << PendingWatermarks.front()); - } - PendingWatermarks.pop(); + if (Channel) { + Channel->ResumeByWatermark(watermark); } } void ResumeByCheckpoint() { PendingCheckpoint.reset(); if (Channel) { // async actor doesn't hold channels, so channel is resumed in task runner actor - Channel->Resume(); + Channel->ResumeByCheckpoint(); } } @@ -1609,18 +1607,11 @@ protected: void InitializeTask() { for (ui32 i = 0; i < Task.InputsSize(); ++i) { const auto& inputDesc = Task.GetInputs(i); + auto watermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; Y_ABORT_UNLESS(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels - if (inputDesc.HasTransform()) { - auto result = InputTransformsMap.emplace( - i, - TAsyncInputTransformHelper(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED) - ); - YQL_ENSURE(result.second); - } - if (inputDesc.HasSource()) { - const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode(); + watermarksMode = inputDesc.GetSource().GetWatermarksMode(); auto result = SourcesMap.emplace( i, static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode) @@ -1628,18 +1619,30 @@ protected: YQL_ENSURE(result.second); } else { for (auto& channel : inputDesc.GetChannels()) { + auto channelWatermarksMode = channel.GetWatermarksMode(); + if (channelWatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { + watermarksMode = channelWatermarksMode; + } auto result = InputChannelsMap.emplace( channel.GetId(), TInputChannelInfo( LogPrefix, channel.GetId(), channel.GetSrcStageId(), - channel.GetWatermarksMode(), + channelWatermarksMode, channel.GetCheckpointingMode()) ); YQL_ENSURE(result.second); } } + + if (inputDesc.HasTransform()) { + auto result = InputTransformsMap.emplace( + i, + TAsyncInputTransformHelper(LogPrefix, i, watermarksMode) + ); + YQL_ENSURE(result.second); + } } for (ui32 i = 0; i < Task.OutputsSize(); ++i) { @@ -1694,12 +1697,6 @@ private: WatermarksTracker.RegisterInputChannel(id); } } - - for (const auto& [id, channel] : OutputChannelsMap) { - if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) { - WatermarksTracker.RegisterOutputChannel(id); - } - } } virtual const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() = 0; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp index 91b60286afa..020441a11f7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp @@ -136,10 +136,10 @@ void TDqComputeActorMetrics::ReportInjectedToOutputsWatermark(TInstant watermark } InjectedToOutputsWatermark->Set(watermark.MilliSeconds()); - auto iter = WatermarkStartedAt.find(watermark); - if (iter != WatermarkStartedAt.end()) { - WatermarkCollectLatency->Collect((TInstant::Now() - iter->second).MilliSeconds()); - WatermarkStartedAt.erase(iter); + auto iter = WatermarkStartedAt.upper_bound(watermark); + if (iter != WatermarkStartedAt.begin()) { + WatermarkCollectLatency->Collect((TInstant::Now() - WatermarkStartedAt.begin()->second).MilliSeconds()); + WatermarkStartedAt.erase(WatermarkStartedAt.begin(), iter); } } @@ -162,9 +162,13 @@ NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetInputChannelCounters( } void TDqComputeActorMetrics::ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark) { + if (!Enable) { + return; + } counters->GetCounter("watermark_ms")->Set(watermark.MilliSeconds()); - if (!WatermarkStartedAt.contains(watermark)) { - WatermarkStartedAt[watermark] = TInstant::Now(); + auto [it, inserted] = WatermarkStartedAt.emplace(watermark, TInstant::Zero()); + if (inserted) { + it->second = TInstant::Now(); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h index 6044acd65d2..1a8022e04dd 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h @@ -55,7 +55,7 @@ private: NMonitoring::TDynamicCounters::TCounterPtr AsyncInputError; NMonitoring::TDynamicCounters::TCounterPtr OtherEvent; - THashMap<TInstant, TInstant> WatermarkStartedAt; + TMap<TInstant, TInstant> WatermarkStartedAt; }; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp index 98c5c9d8ab3..a067ae7e366 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp @@ -31,26 +31,34 @@ void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) { AsyncInputsWatermarks[inputId] = Nothing(); } -void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) { - InputChannelsWatermarks[inputId] = Nothing(); +void TDqComputeActorWatermarks::UnregisterAsyncInput(ui64 inputId) { + auto found = AsyncInputsWatermarks.erase(inputId); + Y_ENSURE(found); + RecalcPendingWatermark(); } -void TDqComputeActorWatermarks::RegisterOutputChannel(ui64 outputId) { - OutputChannelsWatermarks[outputId] = Nothing(); +void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) { + InputChannelsWatermarks[inputId] = Nothing(); } -bool TDqComputeActorWatermarks::HasOutputChannels() const { - return !OutputChannelsWatermarks.empty(); +void TDqComputeActorWatermarks::UnregisterInputChannel(ui64 inputId) { + auto found = InputChannelsWatermarks.erase(inputId); + Y_ENSURE(found); + RecalcPendingWatermark(); } bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark) { + auto it = AsyncInputsWatermarks.find(inputId); + if (it == AsyncInputsWatermarks.end()) { + LOG_D("Ignored watermark notification on unregistered async input " << inputId); + return false; + } + LOG_T("Async input " << inputId << " notified about watermark " << watermark); - auto& asyncInputWatermark = AsyncInputsWatermarks[inputId]; - if (!asyncInputWatermark || *asyncInputWatermark < watermark) { + auto& asyncInputWatermark = it->second; + if (UpdateAndRecalcPendingWatermark(asyncInputWatermark, watermark)) { LOG_T("Async input " << inputId << " watermark was updated to " << watermark); - asyncInputWatermark = watermark; - RecalcPendingWatermark(); return true; } @@ -58,38 +66,60 @@ bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId, } bool TDqComputeActorWatermarks::NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark) { + auto it = InputChannelsWatermarks.find(inputId); + if (it == InputChannelsWatermarks.end()) { + LOG_D("Ignored watermark notification on unregistered input channel" << inputId); + return false; + } + LOG_T("Input channel " << inputId << " notified about watermark " << watermark); - auto& inputChannelWatermark = InputChannelsWatermarks[inputId]; - if (!inputChannelWatermark || *inputChannelWatermark < watermark) { + auto& inputChannelWatermark = it->second; + if (UpdateAndRecalcPendingWatermark(inputChannelWatermark, watermark)) { LOG_T("Input channel " << inputId << " watermark was updated to " << watermark); - inputChannelWatermark = watermark; - RecalcPendingWatermark(); return true; } return false; } -bool TDqComputeActorWatermarks::NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark) { - auto logPrefix = TStringBuilder() << "Output channel " - << outputId << " notified about watermark '" << watermark << "'"; +// Modifies and optionally recalc pending watermarks +bool TDqComputeActorWatermarks::UpdateAndRecalcPendingWatermark(TMaybe<TInstant>& storedWatermark, TInstant watermark) { + if (storedWatermark < watermark) { + auto oldWatermark = std::exchange(storedWatermark, watermark); + if (LastWatermark == oldWatermark) { + // LastWatermark was unset; old watermark value was unset + // -> it is possible now all channels have watermark set, needs recalc + // LastWatermark was set and same as old watermark value + // -> it is possible LastWatermark can be advanced, needs recalc + RecalcPendingWatermark(); + } else { + // otherwise LastWatermark will be same + } + return true; + } + return false; +} - LOG_T(logPrefix); +bool TDqComputeActorWatermarks::NotifyWatermarkWasSent(TInstant watermark) { + auto logPrefix = [watermark] { + return TStringBuilder() << "Output notified about watermark '" << watermark << "'"; + }; - if (watermark <= LastWatermark) { - LOG_E(logPrefix << "' when LastWatermark was already forwarded to " << *LastWatermark); - // We will try to ignore this error, but something strange happened + LOG_T(logPrefix()); + + if (watermark < PendingWatermark) { + LOG_D(logPrefix() << " before '" << PendingWatermark << "'"); + return false; } - if (watermark != PendingWatermark) { - LOG_E(logPrefix << " when '" << PendingWatermark << "' was expected"); + if (watermark > PendingWatermark) { + LOG_E(logPrefix() << " when '" << PendingWatermark << "' was expected"); // We will try to ignore this error, but something strange happened + return false; } - OutputChannelsWatermarks[outputId] = watermark; - - return MaybePopPendingWatermark(); + return true; } bool TDqComputeActorWatermarks::HasPendingWatermark() const { @@ -122,41 +152,13 @@ void TDqComputeActorWatermarks::RecalcPendingWatermark() { newWatermark = std::min(newWatermark, *watermark); } - if (!LastWatermark || newWatermark != LastWatermark) { + if (newWatermark > LastWatermark) { LOG_T("New pending watermark " << newWatermark); PendingWatermark = newWatermark; + LastWatermark = newWatermark; } } -bool TDqComputeActorWatermarks::MaybePopPendingWatermark() { - if (OutputChannelsWatermarks.empty()) { - return true; - } - - if (!PendingWatermark) { - LOG_E("There is no pending watermark, but pop was called"); - // We will try to ignore this error, but something strange happened - return true; - } - - auto outWatermark = TInstant::Max(); - for (const auto& [_, watermark] : OutputChannelsWatermarks) { - if (!watermark) { - return false; - } - - outWatermark = std::min(outWatermark, *watermark); - } - - if (outWatermark >= *PendingWatermark) { - LastWatermark = PendingWatermark; - PopPendingWatermark(); - return true; - } - - return false; -} - void TDqComputeActorWatermarks::PopPendingWatermark() { LOG_T("Watermark " << *PendingWatermark << " was popped. "); PendingWatermark = Nothing(); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h index 1ac5d5326c2..1f7ca1b691c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h @@ -13,8 +13,9 @@ public: void RegisterAsyncInput(ui64 inputId); void RegisterInputChannel(ui64 inputId); - void RegisterOutputChannel(ui64 outputId); - bool HasOutputChannels() const; + + void UnregisterAsyncInput(ui64 inputId); + void UnregisterInputChannel(ui64 inputId); // Will return true, if local watermark inside this async input was moved forward. // CA should pause this async input and wait for coresponding watermarks in all other sources/inputs. @@ -24,9 +25,8 @@ public: // CA should pause this input channel and wait for coresponding watermarks in all other sources/inputs. bool NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark); - // Will return true, if watermark was sent to all registered outputs. - // CA should resume inputs and sources in this case - bool NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark); + // Will return true, if pending watermark completed. + bool NotifyWatermarkWasSent(TInstant watermark); bool HasPendingWatermark() const; TMaybe<TInstant> GetPendingWatermark() const; @@ -36,6 +36,7 @@ public: private: void RecalcPendingWatermark(); + bool UpdateAndRecalcPendingWatermark(TMaybe<TInstant>& storedWatermark, TInstant watermark); bool MaybePopPendingWatermark(); private: @@ -43,7 +44,6 @@ private: std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks; std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks; - std::unordered_map<ui64, TMaybe<TInstant>> OutputChannelsWatermarks; TMaybe<TInstant> PendingWatermark; TMaybe<TInstant> LastWatermark; diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp index 215e2d6e1ed..d8aaa41b646 100644 --- a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp @@ -1,5 +1,7 @@ #include <library/cpp/testing/unittest/registar.h> +#include <util/string/cast.h> + #include <ydb/library/actors/testlib/test_runtime.h> #include <ydb/library/actors/core/actor_bootstrapped.h> #include <ydb/library/services/services.pb.h> @@ -37,8 +39,8 @@ namespace NYql::NDq { namespace { static const bool TESTS_VERBOSE = getenv("TESTS_VERBOSE") != nullptr; -#define LOG_D(stream) LOG_DEBUG_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream); -#define LOG_E(stream) LOG_ERROR_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream); +#define LOG_D(stream) LOG_DEBUG_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream) +#define LOG_E(stream) LOG_ERROR_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream) struct TMockHttpRequest : NMonitoring::IMonHttpRequest { TStringStream Out; TCgiParameters Params; @@ -111,6 +113,7 @@ struct TActorSystem: NActors::TTestActorRuntimeBase { if (TESTS_VERBOSE) { SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_TRACE); + SetLogPriority(NKikimrServices::DQ_TASK_RUNNER, NActors::NLog::EPriority::PRI_TRACE); } } }; @@ -125,8 +128,8 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() { } struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { - static constexpr ui64 InputChannelId = 1; - static constexpr ui64 OutputChannelId = 2; + static constexpr ui64 InputChannelId = 1000; + static constexpr ui64 OutputChannelId = 2000; static constexpr ui32 InputStageId = 123; static constexpr ui32 ThisStageId = 456; static constexpr ui32 OutputStageId = 789; @@ -137,7 +140,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { static constexpr i32 MaxTransformedValue = 10; TActorSystem ActorSystem; TActorId EdgeActor; - TActorId SrcEdgeActor; + std::unordered_map<ui64, TActorId> SrcEdgeActor; // ChannelId -> actor TActorId DstEdgeActor; TScopedAlloc Alloc; @@ -196,7 +199,6 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { ActorSystem.Start(); EdgeActor = ActorSystem.AllocateEdgeActor(); - SrcEdgeActor = ActorSystem.AllocateEdgeActor(); DstEdgeActor = ActorSystem.AllocateEdgeActor(); } @@ -388,13 +390,16 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { // Adds dummy input channel with channelId // returns IDqOutputChannel::TPtr that can be used to inject data/checkpoints/watermarks into channel - auto AddDummyInputChannel(NDqProto::TDqTask& task, ui64 channelId) { - auto& input = *task.AddInputs(); + auto AddDummyInputChannel(NDqProto::TTaskInput& input, ui64 channelId) { auto& channel = *input.AddChannels(); input.MutableUnionAll(); // for side-effect channel.SetId(channelId); + const auto& [srcEdgeActor, inserted] = SrcEdgeActor.try_emplace(channelId); + if (inserted) { + srcEdgeActor->second = ActorSystem.AllocateEdgeActor(); + } auto& chEndpoint = *channel.MutableSrcEndpoint(); - ActorIdToProto(SrcEdgeActor, chEndpoint.MutableActorId()); + ActorIdToProto(srcEdgeActor->second, chEndpoint.MutableActorId()); channel.SetWatermarksMode(NDqProto::WATERMARKS_MODE_DEFAULT); channel.SetCheckpointingMode(NDqProto::CHECKPOINTING_MODE_DEFAULT); channel.SetInMemory(true); @@ -412,12 +417,29 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { TDqOutputChannelSettings settings; settings.TransportVersion = TransportVersion; settings.MutableSettings.IsLocalChannel = true; + settings.Level = TCollectStatsLevel::Profile; return CreateDqOutputChannel(channelId, ThisStageId, (IsWide ? static_cast<TType*>(WideRowType) : RowType), HolderFactory, settings, logFunc); } + auto AddDummyInputChannel(NDqProto::TDqTask& task, ui64 channelId) { + auto& input = *task.AddInputs(); + return AddDummyInputChannel(input, channelId); + } + + auto AddDummyInputChannels(NDqProto::TDqTask& task, ui64 baseChannelId, ui64 numChannels) { + auto& input = *task.AddInputs(); + TVector<IDqOutputChannel::TPtr> fakeOutputs; + + for (; numChannels--; ++baseChannelId) { + fakeOutputs.push_back(AddDummyInputChannel(input, baseChannelId)); + } + + return fakeOutputs; + } + // Adds dummy output channel with channelId // returns IDqInputChannel::TPtr that can be used to simulating reading from this channel auto AddDummyOutputChannel(NDqProto::TDqTask& task, ui64 channelId, TType* type) { @@ -443,14 +465,14 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { ThisStageId, type, 10_MB, - TCollectStatsLevel::None, + TCollectStatsLevel::Profile, TypeEnv, HolderFactory, TransportVersion, NKikimr::NMiniKQL::EValuePackerVersion::V0); } - auto CreateTestAsyncCA(NDqProto::TDqTask& task) { + auto CreateTestAsyncCA(NDqProto::TDqTask& task, NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_PROFILE) { TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = { NYql::GetCommonDqFactory(), NKikimr::NMiniKQL::GetYqlFactory() @@ -474,17 +496,19 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { }); TComputeMemoryLimits memoryLimits; memoryLimits.ChannelBufferSize = 1_MB; - memoryLimits.MkqlLightProgramMemoryLimit = 10_MB; - memoryLimits.MkqlHeavyProgramMemoryLimit = 20_MB; - memoryLimits.MkqlProgramHardMemoryLimit = 30_MB; + memoryLimits.MkqlLightProgramMemoryLimit = 20_MB; + memoryLimits.MkqlHeavyProgramMemoryLimit = 30_MB; + memoryLimits.MkqlProgramHardMemoryLimit = 40_MB; memoryLimits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(64_MB, 32_MB); + TComputeRuntimeSettings runtimeSettings; + runtimeSettings.StatsMode = statsMode; auto actor = CreateDqAsyncComputeActor( EdgeActor, // executerId, LogPrefix, &task, // NYql::NDqProto::TDqTask* task, CreateAsyncIoFactory(), FunctionRegistry.Get(), - {}, // TComputeRuntimeSettings& settings, + runtimeSettings, memoryLimits, taskRunnerActorFactory, {}, // ::NMonitoring::TDynamicCounterPtr taskCounters, @@ -496,6 +520,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { } TUnboxedValueBatch CreateRow(ui32 value, ui64 ts) { + LOG_D("create " << value << " " << ts); if (IsWide) { TUnboxedValueBatch result(WideRowType); result.PushRow([&](ui32 idx) { @@ -521,13 +546,21 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { } } - bool ReceiveData(auto&& cb, auto&& cbWatermark, auto dqInputChannel) { - auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelData>({DstEdgeActor}); + // cb(TUnboxedValue& value, ui32 column) is called for each value in a row + // cbWatermark(TInstant watermark) is called for each received watermark + // beforeFinalAck() is called before sending final ack (when CA is still definitely alive) + bool ReceiveData(auto&& cb, auto&& cbWatermark, auto&& beforeFinalAck, auto dqInputChannel) { + auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelData>({DstEdgeActor}, TDuration::Seconds(20)); + if (!ev) { + throw yexception() << "Failed"; + } LOG_D("Got " << ev->Get()->Record.DebugString()); TDqSerializedBatch sbatch; - sbatch.Proto = ev->Get()->Record.GetChannelData().GetData(); + auto& channelData = *ev->Get()->Record.MutableChannelData(); + sbatch.Proto = std::move(*channelData.MutableData()); dqInputChannel->Push(std::move(sbatch)); - if (ev->Get()->Record.GetChannelData().GetFinished()) { + bool finished = channelData.GetFinished(); + if (finished) { dqInputChannel->Finish(); } TUnboxedValueBatch batch; @@ -566,22 +599,34 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { LOG_D("null"); UNIT_ASSERT(false); } + LOG_D("/"); return true; })) { return false; } } } - if (ev->Get()->Record.GetChannelData().HasWatermark()) { - auto watermark = TInstant::MicroSeconds(ev->Get()->Record.GetChannelData().GetWatermark().GetTimestampUs()); + if (channelData.HasWatermark()) { + auto watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs()); cbWatermark(watermark); } + if (dqInputChannel->IsFinished()) { + beforeFinalAck(); + } + if (!ev->Get()->Record.GetNoAck()) { + auto ack = new TEvDqCompute::TEvChannelDataAck; + ack->Record.SetChannelId(channelData.GetChannelId()); + ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); + ack->Record.SetFreeSpace(3123); // XXX simulates limited channel size + ack->Record.SetFinish(channelData.GetFinished()); + ActorSystem.Send(ev->Sender, ev->Recipient, ack); + } return !dqInputChannel->IsFinished(); } void WaitForChannelDataAck(auto channelId, auto seqNo) { for (;;) { - auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelDataAck>({SrcEdgeActor}); + auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelDataAck>(SrcEdgeActor[channelId]); LOG_D("Got ack " << ev->Get()->Record); UNIT_ASSERT_EQUAL(ev->Get()->Record.GetChannelId(), channelId); if (ev->Get()->Record.GetSeqNo() == seqNo) { @@ -606,111 +651,191 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { } } - void BasicTests(ui32 packets, bool doWatermark, bool waitIntermediateAcks) { - LogPrefix = TStringBuilder() << "Square Test for:" - << " packets=" << packets - << " doWatermark=" << doWatermark - << " waitIntermediateAcks=" << waitIntermediateAcks - << " "; - NDqProto::TDqTask task; - GenerateSquareProgram(task, [](auto& ctx) { - return ctx.template MakeType<TDataExprType>(EDataSlot::Int32); - }); - auto dqOutputChannel = AddDummyInputChannel(task, InputChannelId); - auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowType) : RowType)); + static constexpr ui32 NoAckPeriod = 2; - auto asyncCA = CreateTestAsyncCA(task); - ActorSystem.EnableScheduleForActor(asyncCA, true); - ui32 seqNo = 0; - ui32 val = 0; + void SendData(auto&& generator, const auto& asyncCA, ui32 packets, bool waitIntermediateAcks) { for (ui32 packet = 1; packet <= packets; ++packet) { bool isFinal = packet == packets; - bool noAck = (packet % 2) == 0; // set noAck on even packets + bool noAck = (packet % NoAckPeriod) == 0; // set noAck on even packets - PushRow(CreateRow(++val, packet), dqOutputChannel); - PushRow(CreateRow(++val, packet), dqOutputChannel); - PushRow(CreateRow(++val, packet), dqOutputChannel); - if (doWatermark) { - NDqProto::TWatermark watermark; - watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds()); - dqOutputChannel->Push(std::move(watermark)); - } + auto [dqOutputChannel, seqNo] = generator(packet, isFinal); if (isFinal) { dqOutputChannel->Finish(); } auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>(); - evInputChannelData->Record.SetSeqNo(++seqNo); - evInputChannelData->Record.SetNoAck(noAck); + evInputChannelData->Record.SetSeqNo(++*seqNo); auto& chData = *evInputChannelData->Record.MutableChannelData(); + auto channelId = dqOutputChannel->GetChannelId(); + chData.SetChannelId(channelId); if (TDqSerializedBatch serializedBatch; dqOutputChannel->Pop(serializedBatch)) { *chData.MutableData() = serializedBatch.Proto; Y_ENSURE(serializedBatch.Payload.Empty()); // TODO } if (NDqProto::TWatermark watermark; dqOutputChannel->Pop(watermark)) { *chData.MutableWatermark() = watermark; + noAck = false; // packet containing watermark must be acked } if (NDqProto::TCheckpoint checkpoint; dqOutputChannel->Pop(checkpoint)) { *chData.MutableCheckpoint() = checkpoint; + noAck = false; // packet containing checkpoint must be acked + } + if (dqOutputChannel->IsFinished()) { + chData.SetFinished(true); + noAck = false; // final packet must be acked } - chData.SetChannelId(InputChannelId); - chData.SetFinished(dqOutputChannel->IsFinished()); + evInputChannelData->Record.SetNoAck(noAck); LOG_D("Sending " << packet << "/" << packets << " " << chData); - ActorSystem.Send(asyncCA, SrcEdgeActor, evInputChannelData.Release()); - if ((isFinal || waitIntermediateAcks) && !noAck) { - WaitForChannelDataAck(InputChannelId, seqNo); + ActorSystem.Send(asyncCA, SrcEdgeActor[channelId], evInputChannelData.Release()); + if ((dqOutputChannel->IsFinished() || waitIntermediateAcks) && !noAck) { + WaitForChannelDataAck(dqOutputChannel->GetChannelId(), *seqNo); } } + } - TMap<ui32, ui32> receivedData; - TMaybe<TInstant> watermark; - while (ReceiveData( - [this, &receivedData, &watermark](const NUdf::TUnboxedValue& val, ui32 column) { - UNIT_ASSERT(!!val); - UNIT_ASSERT(val.IsEmbedded()); - if (RowType->GetMemberName(column) == "ts") { - auto ts = val.Get<ui64>(); - if (watermark) { - UNIT_ASSERT_GT(ts, watermark->Seconds()); - } - return true; - } - UNIT_ASSERT_EQUAL(RowType->GetMemberName(column), "id"); - auto data = val.Get<i32>(); - LOG_D(data); - ++receivedData[data]; - return true; - }, - [this, &watermark](const auto& receivedWatermark) { - watermark = receivedWatermark; - LOG_D("Got watermark " << *watermark); - }, - dqInputChannel)) - {} - DumpMonPage(asyncCA, [this](auto&& str) { - UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>"); - UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix); - // TODO add validation - LOG_D(str); - }); - UNIT_ASSERT_EQUAL(receivedData.size(), val); - for (; val > 0; --val) { - UNIT_ASSERT_EQUAL_C(receivedData[val * val], 1, "expected count for " << (val * val)); - } + void SendFinish(const auto& asyncCA, auto dqOutputChannel, auto* seqNo) { + auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>(); + evInputChannelData->Record.SetSeqNo(++*seqNo); + auto& chData = *evInputChannelData->Record.MutableChannelData(); + auto channelId = dqOutputChannel->GetChannelId(); + chData.SetChannelId(channelId); + chData.SetFinished(true); + evInputChannelData->Record.SetNoAck(false); + LOG_D("Sending FINISH " << chData); + ActorSystem.Send(asyncCA, SrcEdgeActor[channelId], evInputChannelData.Release()); + WaitForChannelDataAck(dqOutputChannel->GetChannelId(), *seqNo); } #if 0 // TODO: switch when inputtransform will be fixed; just log for now #define WEAK_UNIT_ASSERT_GT_C UNIT_ASSERT_GT_C +#define WEAK_UNIT_ASSERT_LE_C UNIT_ASSERT_LE_C +#define WEAK_UNIT_ASSERT_EQUAL_C UNIT_ASSERT_EQUAL_C #define WEAK_UNIT_ASSERT UNIT_ASSERT #else #define WEAK_UNIT_ASSERT_GT_C(A, B, C) do { if (!((A) > (B))) LOG_E("Assert " #A " > " #B " failed " << C); } while(0) +#define WEAK_UNIT_ASSERT_LE_C(A, B, C) do { if (!((A) <= (B))) LOG_E("Assert " #A " <= " #B " failed " << C); } while(0) +#define WEAK_UNIT_ASSERT_EQUAL_C(A, B, C) do { if (!((A) == (B))) LOG_E("Assert " #A " == " #B " failed " << C); } while(0) #define WEAK_UNIT_ASSERT(A) do { if (!(A)) LOG_E("Assert " #A " failed "); } while(0) #endif - void InputTransformTests(ui32 packets, bool doWatermark, bool waitIntermediateAcks) { + void BasicMultichannelTests(ui32 packets, ui32 watermarkPeriod, bool waitIntermediateAcks, ui32 numChannels, auto& rng, NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_PROFILE) { + LogPrefix = TStringBuilder() << "Square Test for:" + << " packets=" << packets + << " watermarkPeriod=" << watermarkPeriod + << " waitIntermediateAcks=" << waitIntermediateAcks + << " channels=" << numChannels + << " "; + NDqProto::TDqTask task; + GenerateSquareProgram(task, [](auto& ctx) { + return ctx.template MakeType<TDataExprType>(EDataSlot::Int32); + }); + auto dqOutputChannels = AddDummyInputChannels(task, InputChannelId, numChannels); + auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowType) : RowType)); + + auto asyncCA = CreateTestAsyncCA(task, statsMode); + ActorSystem.EnableScheduleForActor(asyncCA, true); + ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvState>(EdgeActor); + + ui32 val = 0; + TMaybe<TInstant> expectedWatermark; + TVector<ui32> seqNo(numChannels); + TVector<ui64> activeChannels(numChannels); + std::iota(activeChannels.begin(), activeChannels.end(), 0); + SendData([&](auto packet, bool isFinal) { + auto channelIdxIdx = rng() % activeChannels.size(); + std::swap(activeChannels[channelIdxIdx], activeChannels.back()); + auto channelIdx = activeChannels.back(); + auto dqOutputChannel = dqOutputChannels[channelIdx]; + PushRow(CreateRow(++val, packet), dqOutputChannel); + PushRow(CreateRow(++val, packet), dqOutputChannel); + PushRow(CreateRow(++val, packet), dqOutputChannel); + if (watermarkPeriod && packet % watermarkPeriod == 0) { + LOG_D("push watermark" << packet); + NDqProto::TWatermark watermark; + watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds()); + dqOutputChannel->Push(std::move(watermark)); + expectedWatermark = std::max(expectedWatermark, TMaybe<TInstant>(TInstant::Seconds(packet))); + } + if (isFinal || activeChannels.size() > 1 && rng() % std::max(packets/numChannels, ui32{1}) == 0) { + // when we have more than one active channels left, we may randomly finish it midway + dqOutputChannel->Finish(); + activeChannels.pop_back(); + } + return std::pair { dqOutputChannel, &seqNo[channelIdx] }; + }, + asyncCA, packets, waitIntermediateAcks); + // Finish all unfinished channels (when we have more than one channel left, only one is forcibly finished on final packet) + for (ui32 channelIdx = 0; channelIdx < numChannels; ++channelIdx) { + auto dqOutputChannel = dqOutputChannels[channelIdx]; + if (dqOutputChannel->IsFinished()) { + continue; + } + SendFinish(asyncCA, dqOutputChannel, &seqNo[channelIdx]); + } + + TMap<ui32, ui32> receivedData; + TMaybe<TInstant> watermark; + try { + while (ReceiveData( + [this, &receivedData, &watermark](const NUdf::TUnboxedValue& val, ui32 column) { + UNIT_ASSERT(!!val); + UNIT_ASSERT(val.IsEmbedded()); + if (RowType->GetMemberName(column) == "ts") { + auto ts = val.Get<ui64>(); + if (watermark) { + UNIT_ASSERT_GT_C(ts, watermark->Seconds(), ts << " >= " << watermark->Seconds()); + } + return true; + } + UNIT_ASSERT_EQUAL(RowType->GetMemberName(column), "id"); + auto data = val.Get<i32>(); + LOG_D(data); + ++receivedData[data]; + return true; + }, + [this, &watermark](const auto& receivedWatermark) { + watermark = receivedWatermark; + LOG_D("Got watermark " << *watermark); + }, + [this, &asyncCA]() { + DumpMonPage(asyncCA, [this](auto&& str) { + UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>"); + UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix); + // TODO add validation + LOG_D(str); + }); + }, + dqInputChannel)) + {} + } catch(...) { + DumpMonPage(asyncCA, [this](auto&& str) { + LOG_D(str); + }); + throw; + } + UNIT_ASSERT_EQUAL_C(receivedData.size(), val, "expected size " << val << " != " << receivedData.size()); + for (; val > 0; --val) { + UNIT_ASSERT_EQUAL_C(receivedData[val * val], 1, "expected count for " << (val * val)); + } + if (expectedWatermark) { + WEAK_UNIT_ASSERT(!!watermark); + if (watermark) { + UNIT_ASSERT_LE_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " <= " << expectedWatermark); + WEAK_UNIT_ASSERT_EQUAL_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " == " << expectedWatermark << ", Watermark Delay is " << (*expectedWatermark - *watermark)); + LOG_D("Last watermark " << *watermark); + } else { + LOG_E("NO WATERMARK"); + } + } else { + UNIT_ASSERT(!watermark); + } + } + + void InputTransformMultichannelTests(ui32 packets, ui32 watermarkPeriod, bool waitIntermediateAcks, ui32 numChannels, auto& rng) { LogPrefix = TStringBuilder() << "InputTransform Test for:" << " packets=" << packets - << " doWatermark=" << doWatermark + << " watermarkPeriod=" << watermarkPeriod << " waitIntermediateAcks=" << waitIntermediateAcks + << " channels=" << numChannels << " "; NDqProto::TDqTask task; GenerateEmptyProgram(task, [](auto& ctx) { @@ -728,7 +853,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { return structType; }); TMap<i32, ui32> expectedData; - auto dqOutputChannel = AddDummyInputChannel(task, InputChannelId); + auto dqOutputChannels = AddDummyInputChannels(task, InputChannelId, numChannels); auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowTransformedType) : RowTransformedType)); SetInputTransform(task, TDataType::Create(NUdf::TDataType<i32>::Id, TypeEnv), @@ -737,12 +862,19 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { auto asyncCA = CreateTestAsyncCA(task); ActorSystem.EnableScheduleForActor(asyncCA, true); - ui32 seqNo = 0; + ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvState>(EdgeActor); + ui32 val = 0; - for (ui32 packet = 1; packet <= packets; ++packet) { - bool isFinal = packet == packets; - bool noAck = (packet % 2) == 0; // set noAck on even packets + TMaybe<TInstant> expectedWatermark; + TVector<ui32> seqNo(numChannels); + TVector<ui64> activeChannels(numChannels); + std::iota(activeChannels.begin(), activeChannels.end(), 0); + SendData([&](auto packet, bool isFinal) { + auto channelIdxIdx = rng() % activeChannels.size(); + std::swap(activeChannels[channelIdxIdx], activeChannels.back()); + auto channelIdx = activeChannels.back(); + auto dqOutputChannel = dqOutputChannels[channelIdx]; PushRow(CreateRow(++val, packet), dqOutputChannel); ++expectedData[val]; PushRow(CreateRow(++val, packet), dqOutputChannel); @@ -756,36 +888,27 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { ++expectedData[val]; PushRow(CreateRow(++val, packet), dqOutputChannel); ++expectedData[val]; - if (doWatermark) { + if (watermarkPeriod && packet % watermarkPeriod == 0) { NDqProto::TWatermark watermark; watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds()); dqOutputChannel->Push(std::move(watermark)); + expectedWatermark = std::max(expectedWatermark, TMaybe<TInstant>(TInstant::Seconds(packet))); } - if (isFinal) { + if (isFinal || activeChannels.size() > 1 && rng() % std::max(packets/numChannels, ui32{1}) == 0) { + // when we have more than one active channels left, we may randomly finish it midway dqOutputChannel->Finish(); + activeChannels.pop_back(); } - - auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>(); - evInputChannelData->Record.SetSeqNo(++seqNo); - evInputChannelData->Record.SetNoAck(noAck); - auto& chData = *evInputChannelData->Record.MutableChannelData(); - if (TDqSerializedBatch serializedBatch; dqOutputChannel->Pop(serializedBatch)) { - *chData.MutableData() = serializedBatch.Proto; - Y_ENSURE(serializedBatch.Payload.Empty()); // TODO - } - if (NDqProto::TWatermark watermark; dqOutputChannel->Pop(watermark)) { - *chData.MutableWatermark() = watermark; - } - if (NDqProto::TCheckpoint checkpoint; dqOutputChannel->Pop(checkpoint)) { - *chData.MutableCheckpoint() = checkpoint; - } - chData.SetChannelId(InputChannelId); - chData.SetFinished(dqOutputChannel->IsFinished()); - LOG_D("Sending " << packet << "/" << packets << " " << chData); - ActorSystem.Send(asyncCA, SrcEdgeActor, evInputChannelData.Release()); - if ((isFinal || waitIntermediateAcks) && !noAck) { - WaitForChannelDataAck(InputChannelId, seqNo); + return std::pair { dqOutputChannel, &seqNo[channelIdx] }; + }, + asyncCA, packets, waitIntermediateAcks); + // Finish all unfinished channels (when we have more than one channel left, only one is forcibly finished on final packet) + for (ui32 channelIdx = 0; channelIdx < numChannels; ++channelIdx) { + auto dqOutputChannel = dqOutputChannels[channelIdx]; + if (dqOutputChannel->IsFinished()) { + continue; } + SendFinish(asyncCA, dqOutputChannel, &seqNo[channelIdx]); } TMap<i32, ui32> receivedData; @@ -799,16 +922,16 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { if (columnName == "e.id") { UNIT_ASSERT(!!val); UNIT_ASSERT(val.IsEmbedded()); - LOG_D(column << "id = " << val.Get<i32>()); + LOG_D(column << " id = " << val.Get<i32>()); col0 = val.Get<i32>(); ++receivedData[val.Get<i32>()]; } else if (columnName == "e.ts") { UNIT_ASSERT(!!val); UNIT_ASSERT(val.IsEmbedded()); auto ts = val.Get<ui64>(); - LOG_D(column << "ts = " << ts); + LOG_D(column << " ts = " << ts); if (watermark) { - WEAK_UNIT_ASSERT_GT_C(ts, watermark->Seconds(), "Timestamp " << ts << " before watermark: " << watermark->Seconds()); + UNIT_ASSERT_GT_C(ts, watermark->Seconds(), "Timestamp " << ts << " before watermark: " << watermark->Seconds()); } } else if (columnName == "u.key") { if (col0 >= MinTransformedValue && col0 <= MaxTransformedValue) { @@ -817,10 +940,11 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { UNIT_ASSERT(!!cval); UNIT_ASSERT(cval.IsEmbedded()); auto data = cval.Get<i32>(); - LOG_D(column << "key = " << data); + LOG_D(column << " key = " << data); UNIT_ASSERT_EQUAL_C(data, col0, data << "!=" << col0); } else { UNIT_ASSERT_C(!val, "null (1) expected for " << col0); + LOG_D(column << " key IS NULL"); } } else if (columnName == "u.data") { if (col0 >= MinTransformedValue && col0 <= MaxTransformedValue) { @@ -828,10 +952,11 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { const auto cval = val.GetOptionalValue(); UNIT_ASSERT(!!cval); auto ref = TString(cval.AsStringRef()); - LOG_D(column << "data = '" << ref << "'"); + LOG_D(column << " data = '" << ref << "'"); UNIT_ASSERT_EQUAL(ref, ToString(col0)); } else { UNIT_ASSERT_C(!val, "null (2) expected for " << col0); + LOG_D(column << " data IS NULL"); } } else { UNIT_ASSERT_C(false, "Unexpected column " << column << " name " << columnName); @@ -842,27 +967,48 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture { watermark = receivedWatermark; LOG_D("Got watermark " << *watermark); }, + [this, &asyncCA]() { + DumpMonPage(asyncCA, [this](auto&& str) { + UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>"); + UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix); + // TODO add validation + LOG_D(str); + }); + }, dqInputChannel)) {} - DumpMonPage(asyncCA, [this](auto&& str) { - UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>"); - UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix); - // TODO add validation - LOG_D(str); - }); - UNIT_ASSERT_EQUAL(receivedData.size(), expectedData.size()); + UNIT_ASSERT_EQUAL_C(receivedData.size(), expectedData.size(), "received " << receivedData.size() << " != expected " << expectedData.size()); for (auto [receivedVal, receivedCnt] : receivedData) { UNIT_ASSERT_EQUAL_C(receivedCnt, expectedData[receivedVal], "expected count for " << receivedVal << ": " << receivedCnt << " != " << expectedData[receivedVal]); } - if (doWatermark) { + if (expectedWatermark) { WEAK_UNIT_ASSERT(!!watermark); if (watermark) { + UNIT_ASSERT_LE_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " <= " << expectedWatermark); + WEAK_UNIT_ASSERT_EQUAL_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " == " << expectedWatermark << ", Watermark Delay is " << (*expectedWatermark - *watermark)); LOG_D("Last watermark " << *watermark); } else { LOG_E("NO WATERMARK"); } + } else { + UNIT_ASSERT(!watermark); } } + + auto GetRandomSeed() { + uint32_t seed = 0; // by default tests are reproducible (fixed-seed PRNG) + if (auto env = getenv("RANDOM_SEED")) { + if (*env) { + // with non-empty $RANDOM_SEED use it as seed (to reproduce random test failures) + seed = ::FromString<uint32_t>(env); + } else { + // with empty $RANDOM_SEED make tests truly random + seed = (std::random_device {})(); + Cerr << "RANDOM_SEED=" << seed << Endl; + } + } + return seed; + } }; } //namespace anonymous @@ -871,20 +1017,44 @@ Y_UNIT_TEST_SUITE(TAsyncComputeActorTest) { Y_UNIT_TEST_F(Empty, TAsyncCATestFixture) { } Y_UNIT_TEST_F(Basic, TAsyncCATestFixture) { + TVector<ui32> sizes{ 1, 2, 3, 4, 5, 51, 128, 251 }; + auto seed = GetRandomSeed(); + std::mt19937 rng(seed); + for (ui32 t = 0; t < 16; ++t) sizes.push_back(1 + rng() % 734); for (bool waitIntermediateAcks : { false, true }) { - for (bool doWatermark : { false, true }) { - for (ui32 packets : { 1, 2, 3, 4, 5 }) { - BasicTests(packets, doWatermark, waitIntermediateAcks); + for (ui32 watermarkPeriod : { 0, 1, 3 }) { + for (ui32 packets : sizes) { + for (ui32 numChannels : { 1, 3, 7, 16 }) { + std::mt19937 trng(seed); + BasicMultichannelTests(packets, watermarkPeriod, waitIntermediateAcks, numChannels, trng); + } } } } } - Y_UNIT_TEST_F(InputTransform, TAsyncCATestFixture) { - for (bool waitIntermediateAcks : { false, true }) { - for (bool doWatermark : { false, true }) { - for (ui32 packets : { 1, 2, 3, 4, 5, 111 }) { - InputTransformTests(packets, doWatermark, waitIntermediateAcks); + Y_UNIT_TEST_F(StatsMode, TAsyncCATestFixture) { + for (auto statsMode : { + NDqProto::DQ_STATS_MODE_NONE, + NDqProto::DQ_STATS_MODE_BASIC, + NDqProto::DQ_STATS_MODE_FULL, + NDqProto::DQ_STATS_MODE_PROFILE, + }) { + std::mt19937 rng; + BasicMultichannelTests(5, 1, true, 1, rng, statsMode); + } + } + + Y_UNIT_TEST_F(InputTransformMultichannel, TAsyncCATestFixture) { + TVector<ui32> sizes{ 1, 2, 3, 4, 5, 51, 128, 251 }; + std::mt19937 rng(GetRandomSeed()); + for (ui32 t = 0; t < 16; ++t) sizes.push_back(1 + rng() % 734); + for (ui32 numChannels: { 1, 2, 7, 11 }) { + for (bool waitIntermediateAcks : { false, true }) { + for (ui32 watermarkPeriod : { 0, 1, 3 }) { + for (ui32 packets : sizes) { + InputTransformMultichannelTests(packets, watermarkPeriod, waitIntermediateAcks, numChannels, rng); + } } } } diff --git a/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp b/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp index a7c56e0ad3d..5bf48f39be1 100644 --- a/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp +++ b/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp @@ -139,7 +139,7 @@ private: if (PayloadType->GetMemberType(0)->IsOptional()) { valueItems[0] = valueItems[0].MakeOptional(); } - } else if (val == 999) { // simulate error + } else if (val == 9999) { // simulate error SendError(); return; } diff --git a/ydb/library/yql/dq/actors/compute/ut/ya.make b/ydb/library/yql/dq/actors/compute/ut/ya.make index 2bb32ad97ed..02bbde55132 100644 --- a/ydb/library/yql/dq/actors/compute/ut/ya.make +++ b/ydb/library/yql/dq/actors/compute/ut/ya.make @@ -41,6 +41,8 @@ PEERDIR( YQL_LAST_ABI_VERSION() +SIZE(MEDIUM) + END() RECURSE( diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index 769a44b7ab5..e61ef571cd3 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -119,17 +119,19 @@ struct TEvOutputChannelDataRequest struct TEvInputChannelData : NActors::TEventLocal<TEvInputChannelData, TTaskRunnerEvents::EvInputChannelData> { - TEvInputChannelData(ui32 channelId, std::optional<TDqSerializedBatch>&& data, bool finish, bool pauseAfterPush) + TEvInputChannelData(ui32 channelId, std::optional<TDqSerializedBatch>&& data, bool finish, bool pauseAfterPush, const TMaybe<TInstant>& watermarkAfterPush = {}) : ChannelId(channelId) , Data(std::move(data)) , Finish(finish) , PauseAfterPush(pauseAfterPush) + , WatermarkAfterPush(watermarkAfterPush) { } const ui32 ChannelId; std::optional<TDqSerializedBatch> Data; //not const, because we want to efficiently move data out of this event on a reciever side const bool Finish; const bool PauseAfterPush; + const TMaybe<TInstant> WatermarkAfterPush; }; //Sent by TaskRunnerActor to ComputeActor to ackonowledge input data received in TEvInputChannelData @@ -223,7 +225,9 @@ struct TEvTaskRunFinished const TDqMemoryQuota::TProfileStats& profileStats = {}, ui64 mkqlMemoryLimit = 0, THolder<TMiniKqlProgramState>&& programState = nullptr, - bool watermarkInjectedToOutputs = false, + TMaybe<TInstant> watermarkInjectedToOutputs = Nothing(), + TVector<ui32>&& finishedInputsWithWatermarks = {}, + TVector<ui32>&& finishedSourcesWithWatermarks = {}, bool checkpointRequestedFromTaskRunner = false, TDuration computeTime = TDuration::Zero()) : RunStatus(runStatus) @@ -234,6 +238,8 @@ struct TEvTaskRunFinished , MkqlMemoryLimit(mkqlMemoryLimit) , ProgramState(std::move(programState)) , WatermarkInjectedToOutputs(watermarkInjectedToOutputs) + , FinishedInputsWithWatermarks(finishedInputsWithWatermarks) + , FinishedSourcesWithWatermarks(finishedSourcesWithWatermarks) , CheckpointRequestedFromTaskRunner(checkpointRequestedFromTaskRunner) , ComputeTime(computeTime) { } @@ -246,7 +252,9 @@ struct TEvTaskRunFinished TDqMemoryQuota::TProfileStats ProfileStats; ui64 MkqlMemoryLimit = 0; THolder<TMiniKqlProgramState> ProgramState; - bool WatermarkInjectedToOutputs = false; + TMaybe<TInstant> WatermarkInjectedToOutputs = Nothing(); + TVector<ui32> FinishedInputsWithWatermarks; + TVector<ui32> FinishedSourcesWithWatermarks; bool CheckpointRequestedFromTaskRunner = false; TDuration ComputeTime; }; @@ -296,16 +304,6 @@ struct TEvOutputChannelData bool Changed; }; -struct TWatermarkRequest { - TWatermarkRequest() = default; - - explicit TWatermarkRequest(TInstant watermark) - : Watermark(watermark) { - } - - TInstant Watermark; -}; - // Holds info required to inject barriers to outputs struct TCheckpointRequest { explicit TCheckpointRequest(const NDqProto::TCheckpoint& checkpoint) @@ -321,7 +319,7 @@ struct TEvContinueRun TEvContinueRun() = default; explicit TEvContinueRun( - TMaybe<TWatermarkRequest>&& watermarkRequest, + TMaybe<TInstant>&& watermarkRequest, TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly ) @@ -340,7 +338,7 @@ struct TEvContinueRun bool AskFreeSpace = true; const TVector<ui32> InputChannels; ui64 MemLimit; - TMaybe<TWatermarkRequest> WatermarkRequest = Nothing(); + TMaybe<TInstant> WatermarkRequest = Nothing(); TMaybe<TCheckpointRequest> CheckpointRequest = Nothing(); bool CheckpointOnly = false; }; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index db196a295ab..c2e32e718a2 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -131,16 +131,55 @@ private: } bool ReadyToCheckpoint() { + for (const auto sourceId: Sources) { + const auto input = TaskRunner->GetSource(sourceId); + // sources are not polled upon checkpoint + if (!input->Empty()) { // check if buffer is empty + return false; + } + } for (const auto inputChannelId: InputsWithCheckpoints) { const auto input = TaskRunner->GetInputChannel(inputChannelId); - if (!input->IsPaused()) { + if (!input->IsPausedByCheckpoint()) { + return false; + } + if (!input->Empty()) { + return false; + } + } + for (const auto transformId: InputTransformsWithCheckpoints) { + const auto t = TaskRunner->GetInputTransform(transformId); + if (t) { + auto [_, transform] = *t; + if (!transform->Empty()) { + return false; + } + if (transform->IsPending()) { + return false; + } + } + } + return true; + } + + bool ReadyToWatermark() { + for (const auto sourceId: SourcesWithWatermarks) { + const auto input = TaskRunner->GetSource(sourceId); + // sources are not polled upon watermark + if (!input->Empty()) { // check if buffer is empty + return false; + } + } + for (const auto inputChannelId: InputsWithWatermarks) { + const auto input = TaskRunner->GetInputChannel(inputChannelId); + if (!input->IsPausedByWatermark()) { return false; } if (!input->Empty()) { return false; } } - for (const auto transformId: InputTransforms) { + for (const auto transformId: InputTransformsWithWatermarks) { const auto t = TaskRunner->GetInputTransform(transformId); if (t) { auto [_, transform] = *t; @@ -166,14 +205,26 @@ private: THashMap<ui32, i64> inputChannelFreeSpace; THashMap<ui32, i64> sourcesFreeSpace; - const bool shouldHandleWatermark = ev->Get()->WatermarkRequest.Defined() - && ev->Get()->WatermarkRequest->Watermark > TaskRunner->GetWatermark().WatermarkIn; + const auto& nextWatermark = ev->Get()->WatermarkRequest; + if (LastWatermark < nextWatermark) { + LastWatermark = *nextWatermark; + if (WatermarkRequests.empty()) { + if (HasActiveCheckpoint) { + LOG_T("Watermark delayed by checkpoint"); + } else { + PauseInputs(*nextWatermark); + } + } + WatermarkRequests.push_back(*nextWatermark); + } if (!ev->Get()->CheckpointOnly) { - if (shouldHandleWatermark) { - const auto watermark = ev->Get()->WatermarkRequest->Watermark; - LOG_T("Task runner. Inject watermark " << watermark); - TaskRunner->SetWatermarkIn(watermark); + if (!WatermarkRequests.empty()) { + auto watermarkRequest = WatermarkRequests.front(); + if (TaskRunner->GetWatermark().WatermarkIn < watermarkRequest && ReadyToWatermark()) { + LOG_T("Task runner. Inject watermark " << watermarkRequest); + TaskRunner->SetWatermarkIn(watermarkRequest); + } } res = TaskRunner->Run(); @@ -187,21 +238,31 @@ private: sourcesFreeSpace[index] = TaskRunner->GetSource(index)->GetFreeSpace(); } - auto watermarkInjectedToOutputs = false; + TMaybe<TInstant> watermarkInjectedToOutputs; THolder<TMiniKqlProgramState> mkqlProgramState; if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) { - if (shouldHandleWatermark) { - const auto watermarkRequested = ev->Get()->WatermarkRequest->Watermark; - LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequested + if (!WatermarkRequests.empty() && WatermarkRequests.front() == TaskRunner->GetWatermark().WatermarkIn) { + auto watermarkRequest = WatermarkRequests.front(); + WatermarkRequests.pop_front(); + LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequest << " to " << OutputsWithWatermarks.size() << " outputs "); for (const auto& channelId : OutputsWithWatermarks) { NDqProto::TWatermark watermark; - watermark.SetTimestampUs(watermarkRequested.MicroSeconds()); + watermark.SetTimestampUs(watermarkRequest.MicroSeconds()); TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark)); } - - watermarkInjectedToOutputs = true; + ResumeByWatermark(watermarkRequest); + watermarkInjectedToOutputs = watermarkRequest; + if (!WatermarkRequests.empty()) { + if (HasActiveCheckpoint) { + LOG_T("Next watermark delayed by active checkpoint"); + } else { + auto nextWatermarkRequest = WatermarkRequests.front(); + LOG_T("Task runner. Re-pause on watermark " << nextWatermarkRequest); + PauseInputs(nextWatermarkRequest); + } + } } if (ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) { @@ -223,6 +284,40 @@ private: LOG_E("Failed to save state: " << e.what()); mkqlProgramState = nullptr; } + HasActiveCheckpoint = false; + if (!WatermarkRequests.empty()) { + auto nextWatermarkRequest = WatermarkRequests.front(); + LOG_T("Task runner. Pause by watermark " << nextWatermarkRequest); + PauseInputs(nextWatermarkRequest); + } + } + } + + TVector<ui32> finishedInputsWithWatermarks; + TVector<ui32> finishedSourcesWithWatermarks; + if (WatermarkRequests.empty()) { + // check if any of inputs become empty and finished and drop them off + for (ui32 i = InputsWithWatermarksPendingFinish; i < InputsWithWatermarks.size(); ) { + auto& channelId = InputsWithWatermarks[i]; + auto inputChannel = TaskRunner->GetInputChannel(channelId); + if (inputChannel->IsFinished()) { + finishedInputsWithWatermarks.push_back(channelId); + std::swap(channelId, InputsWithWatermarks.back()); + InputsWithWatermarks.pop_back(); + } else { + ++i; + } + } + for (ui32 i = SourcesWithWatermarksPendingFinish; i < SourcesWithWatermarks.size(); ) { + auto& sourceId = SourcesWithWatermarks[i]; + auto source = TaskRunner->GetSource(sourceId); + if (source->IsFinished()) { + finishedSourcesWithWatermarks.push_back(sourceId); + std::swap(sourceId, SourcesWithWatermarks.back()); + SourcesWithWatermarks.pop_back(); + } else { + ++i; + } } } @@ -258,6 +353,8 @@ private: MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0, std::move(mkqlProgramState), watermarkInjectedToOutputs, + std::move(finishedInputsWithWatermarks), + std::move(finishedSourcesWithWatermarks), ev->Get()->CheckpointRequest.Defined(), TInstant::Now() - start), /*flags=*/0, @@ -275,9 +372,26 @@ private: const ui64 freeSpace = inputChannel->GetFreeSpace(); if (finish) { inputChannel->Finish(); + + // check if finished channel was tracked for watermarks move them to Pending part + Y_DEBUG_ABORT_UNLESS(InputsWithWatermarksPendingFinish <= InputsWithWatermarks.size()); + const auto end = InputsWithWatermarks.begin() + InputsWithWatermarksPendingFinish; + auto it = std::find(InputsWithWatermarks.begin(), end, channelId); // O(n), but rare/once-per-channel + if (it != end) { + Y_DEBUG_ABORT_UNLESS(InputsWithWatermarksPendingFinish > 0); + --InputsWithWatermarksPendingFinish; + std::swap(*it, InputsWithWatermarks[InputsWithWatermarksPendingFinish]); + } } if (ev->Get()->PauseAfterPush) { - inputChannel->Pause(); + HasActiveCheckpoint = true; + inputChannel->PauseByCheckpoint(); + } + if (ev->Get()->WatermarkAfterPush) { + LOG_T("Adding " << *ev->Get()->WatermarkAfterPush); + inputChannel->AddWatermark(*ev->Get()->WatermarkAfterPush); + } else { + LOG_T("No watermark "); } // run @@ -299,6 +413,15 @@ private: source->Push(std::move(batch), space); if (finish) { source->Finish(); + + Y_DEBUG_ABORT_UNLESS(SourcesWithWatermarksPendingFinish <= SourcesWithWatermarks.size()); + const auto end = SourcesWithWatermarks.begin() + SourcesWithWatermarksPendingFinish; + auto it = std::find(SourcesWithWatermarks.begin(), end, index); // O(n), but rare/once-per-channel + if (it != end) { + Y_DEBUG_ABORT_UNLESS(SourcesWithWatermarksPendingFinish > 0); + --SourcesWithWatermarksPendingFinish; + std::swap(*it, SourcesWithWatermarks[SourcesWithWatermarksPendingFinish]); + } } Send( ParentId, @@ -357,7 +480,7 @@ private: checkpoint = hasCheckpoint ? std::move(poppedCheckpoint) : TMaybe<NDqProto::TCheckpoint>(); if (hasCheckpoint) { - ResumeInputs(); + ResumeByCheckpoint(); break; } @@ -381,9 +504,21 @@ private: ev->Cookie); } - void ResumeInputs() { - for (const auto& inputId : Inputs) { - TaskRunner->GetInputChannel(inputId)->Resume(); + void ResumeByCheckpoint() { + for (const auto& inputId : InputsWithCheckpoints) { + TaskRunner->GetInputChannel(inputId)->ResumeByCheckpoint(); + } + } + + void ResumeByWatermark(TInstant watermark) { + for (const auto& inputId : InputsWithWatermarks) { + TaskRunner->GetInputChannel(inputId)->ResumeByWatermark(watermark); + } + } + + void PauseInputs(TInstant watermark) { + for (const auto& inputId : InputsWithWatermarks) { + TaskRunner->GetInputChannel(inputId)->PauseByWatermark(watermark); } } @@ -404,7 +539,7 @@ private: if (hasCheckpoint) { checkpointSize = checkpoint.ByteSize(); maybeCheckpoint.ConstructInPlace(std::move(checkpoint)); - ResumeInputs(); + ResumeByCheckpoint(); } const bool finished = sink->IsFinished(); const bool changed = finished || size > 0 || hasCheckpoint; @@ -422,19 +557,43 @@ private: auto& inputs = settings.GetInputs(); for (auto inputId = 0; inputId < inputs.size(); inputId++) { auto& input = inputs[inputId]; + bool inputWatermarksDisabled = false; + bool inputCheckpointDisabled = false; if (input.HasSource()) { Sources.emplace_back(inputId); + if (input.GetSource().GetWatermarksMode() == NDqProto::WATERMARKS_MODE_DISABLED) { + inputWatermarksDisabled = true; + } else { + SourcesWithWatermarks.emplace_back(inputId); + } } else { for (auto& channel : input.GetChannels()) { Inputs.emplace_back(channel.GetId()); if (channel.GetCheckpointingMode() != NDqProto::CHECKPOINTING_MODE_DISABLED) { InputsWithCheckpoints.emplace_back(channel.GetId()); + } else { + inputCheckpointDisabled = true; + } + if (channel.GetWatermarksMode() != NDqProto::WATERMARKS_MODE_DISABLED) { + InputsWithWatermarks.emplace_back(channel.GetId()); + } else { + inputWatermarksDisabled = true; } } } + if (input.HasTransform()) { + if (!inputWatermarksDisabled) { + InputTransformsWithWatermarks.emplace_back(inputId); + } + if (!inputCheckpointDisabled) { + InputTransformsWithCheckpoints.emplace_back(inputId); + } + } } std::sort(Inputs.begin(), Inputs.end()); Y_ENSURE(std::unique(Inputs.begin(), Inputs.end()) == Inputs.end()); + InputsWithWatermarksPendingFinish = InputsWithWatermarks.size(); + SourcesWithWatermarksPendingFinish = SourcesWithWatermarks.size(); auto& outputs = settings.GetOutputs(); for (auto outputId = 0; outputId < outputs.size(); outputId++) { @@ -526,14 +685,23 @@ private: const ui64 TaskId; TVector<ui32> Inputs; TVector<ui32> InputsWithCheckpoints; + TVector<ui32> InputsWithWatermarks; + ui32 InputsWithWatermarksPendingFinish; // index in InputsWithWatermarks after which source buffers has pending finished mark TVector<ui32> InputTransforms; + TVector<ui32> InputTransformsWithCheckpoints; + TVector<ui32> InputTransformsWithWatermarks; TVector<ui32> Sources; + TVector<ui32> SourcesWithWatermarks; + ui32 SourcesWithWatermarksPendingFinish; // index in SourcesWithWatermarks after which source buffers has pending finished mark TVector<ui32> Sinks; TVector<ui32> Outputs; TVector<ui32> OutputsWithWatermarks; TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner; THolder<TDqMemoryQuota> MemoryQuota; ui64 ActorElapsedTicks = 0; + TMaybe<TInstant> LastWatermark; + std::deque<TInstant> WatermarkRequests; + bool HasActiveCheckpoint = false; }; struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory { diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp index 7f6265660b5..5c4701a9a32 100644 --- a/ydb/library/yql/dq/runtime/dq_async_input.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp @@ -58,7 +58,7 @@ public: } bool IsPending() const override { - return Pending; + return Pending && !IsFinished(); } }; diff --git a/ydb/library/yql/dq/runtime/dq_input.h b/ydb/library/yql/dq/runtime/dq_input.h index 945430362be..db2a4bf6292 100644 --- a/ydb/library/yql/dq/runtime/dq_input.h +++ b/ydb/library/yql/dq/runtime/dq_input.h @@ -43,9 +43,24 @@ public: // After pause IDqInput::Pop() stops return batches that were pushed before pause // and returns Empty() after all the data before pausing was read. // Compute Actor can push data after pause, but program won't receive it until Resume() is called. - virtual void Pause() = 0; - virtual void Resume() = 0; - virtual bool IsPaused() const = 0; + virtual void PauseByCheckpoint() = 0; + virtual void ResumeByCheckpoint() = 0; + virtual bool IsPausedByCheckpoint() const = 0; + // Watermarks + // Called after receiving watermark (watermark position remembered, + // but does not pause channel); watermark may be pushed behind new data + // by reading or checkpoint + virtual void AddWatermark(TInstant watermark) = 0; + // Called after watermark is ready for TaskRunner (got watermarks on all channels; + // implies channel must already contain greater-or-equal watermark); + // Same as with PauseByCheckpoint, any data added adter Pause is not received until Resume; + // If called after PauseByCheckpoint(), checkpoint takes priority + virtual void PauseByWatermark(TInstant watermark) = 0; + // Called after watermark processed by TaskRunner; + // If called before PauseByCheckpoint, all watermarks greater than this + // moved after checkpoint (with no data between checkpoint and watermark) + virtual void ResumeByWatermark(TInstant watermark) = 0; + virtual bool IsPausedByWatermark() const = 0; }; } // namespace NYql::NDq diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp index f0098f0fc0f..cbc224daed6 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp @@ -107,9 +107,19 @@ public: return (DataForDeserialize.empty() || Impl.IsPaused()) && Impl.Empty(); } - void Pause() override { + void PauseByCheckpoint() override { DeserializeAllData(); - Impl.Pause(); + Impl.PauseByCheckpoint(); + } + + void AddWatermark(TInstant watermark) override { + DeserializeAllData(); + Impl.AddWatermark(watermark); + } + + void PauseByWatermark(TInstant watermark) override { + DeserializeAllData(); + Impl.PauseByWatermark(watermark); } bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override { @@ -147,12 +157,20 @@ public: return Impl.GetInputType(); } - void Resume() override { - Impl.Resume(); + void ResumeByCheckpoint() override { + Impl.ResumeByCheckpoint(); + } + + bool IsPausedByCheckpoint() const override { + return Impl.IsPausedByCheckpoint(); + } + + void ResumeByWatermark(TInstant watermark) override { + Impl.ResumeByWatermark(watermark); } - bool IsPaused() const override { - return Impl.IsPaused(); + bool IsPausedByWatermark() const override { + return Impl.IsPausedByWatermark(); } void Finish() override { diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h index 883a927fb4f..37c0b9e31ef 100644 --- a/ydb/library/yql/dq/runtime/dq_input_impl.h +++ b/ydb/library/yql/dq/runtime/dq_input_impl.h @@ -36,11 +36,6 @@ public: return StoredBytes; } - [[nodiscard]] - bool Empty() const override { - return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0); - } - bool IsLegacySimpleBlock(NKikimr::NMiniKQL::TStructType* structType, ui32& blockLengthIndex) { auto index = structType->FindMemberIndex(BlockLengthColumnName); if (index) { @@ -169,6 +164,7 @@ public: } Batches.emplace_back(std::move(batch)); + AddBatchCounts(space, rows); return rows; } @@ -186,12 +182,12 @@ public: auto& popStats = static_cast<TDerived*>(this)->PopStats; popStats.Resume(); //save timing before processing - ui64 popBytes = 0; - if (IsPaused()) { - ui64 batchesCount = GetBatchesBeforePause(); - Y_ABORT_UNLESS(batchesCount > 0); - Y_ABORT_UNLESS(batchesCount <= Batches.size()); + auto [popBytes, popRows, batchesCount] = PopReadyCounts(); + Y_ABORT_UNLESS(batchesCount > 0); + Y_ABORT_UNLESS(batchesCount <= Batches.size()); + + if (batchesCount != Batches.size()) { if (batch.IsWide()) { while (batchesCount--) { @@ -211,15 +207,8 @@ public: } } - popBytes = StoredBytesBeforePause; - - BatchesBeforePause = PauseMask; - Y_ABORT_UNLESS(GetBatchesBeforePause() == 0); - StoredBytes -= StoredBytesBeforePause; - StoredRows -= StoredRowsBeforePause; - StoredBytesBeforePause = 0; - StoredRowsBeforePause = 0; } else { + if (batch.IsWide()) { for (auto&& part : Batches) { part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) { @@ -234,16 +223,15 @@ public: } } - popBytes = StoredBytes; - - StoredBytes = 0; - StoredRows = 0; Batches.clear(); } + StoredBytes -= popBytes; + StoredRows -= popRows; + if (popStats.CollectBasic()) { popStats.Bytes += popBytes; - popStats.Rows += GetRowsCount(batch); + popStats.Rows += popRows; popStats.Chunks++; } @@ -263,27 +251,166 @@ public: return InputType; } - void Pause() override { - Y_ABORT_UNLESS(!IsPaused()); - BatchesBeforePause = Batches.size() | PauseMask; - StoredRowsBeforePause = StoredRows; - StoredBytesBeforePause = StoredBytes; + [[nodiscard]] + bool Empty() const override { + return Batches.empty() || IsPaused() && BeforeBarrier.Batches == 0; + } + +private: + void AddBatchCounts(ui64 space, ui64 rows) { + auto& barrier = PendingBarriers.empty() ? BeforeBarrier : PendingBarriers.back(); + barrier.Batches ++; + barrier.Bytes += space; + barrier.Rows += rows; + } + + std::tuple<ui64, ui64, ui64> PopReadyCounts() { + if (!PendingBarriers.empty() && !IsPaused()) { + // There were watermarks, but channel is not paused + // Process data anyway and move watermarks behind + auto lastBarrier = PendingBarriers.back().Barrier; + for (const auto& barrier : PendingBarriers) { + Y_ENSURE(!barrier.IsCheckpoint()); + BeforeBarrier += barrier; + } + PendingBarriers.clear(); + PendingBarriers.emplace_back(TBarrier { .Barrier = lastBarrier }); + } + auto popBatches = BeforeBarrier.Batches; + auto popBytes = BeforeBarrier.Bytes; + auto popRows = BeforeBarrier.Rows; + + BeforeBarrier.Clear(); + return std::make_tuple(popBytes, popRows, popBatches); } - void Resume() override { - StoredBytesBeforePause = StoredRowsBeforePause = BatchesBeforePause = 0; - Y_ABORT_UNLESS(!IsPaused()); +public: + bool IsPaused() const { + return IsPausedByWatermark() || IsPausedByCheckpoint(); } - bool IsPaused() const override { - return BatchesBeforePause; +private: + void SkipWatermarksBeforeBarrier() { + // Drop watermarks before current barrier + while (!PendingBarriers.empty()) { + auto& barrier = PendingBarriers.front(); + if (barrier.Barrier >= PauseBarrier) { + break; + } + BeforeBarrier.Batches += barrier.Batches; + BeforeBarrier.Rows += barrier.Rows; + BeforeBarrier.Bytes += barrier.Bytes; + PendingBarriers.pop_front(); + } } -protected: - ui64 GetBatchesBeforePause() const { - return BatchesBeforePause & ~PauseMask; +public: + void PauseByWatermark(TInstant watermark) override { + Y_ENSURE(PauseBarrier <= watermark); + PauseBarrier = watermark; + if (IsPausedByCheckpoint()) { + return; + } + if (IsFinished()) { + return; + } + SkipWatermarksBeforeBarrier(); + Y_ENSURE(!PendingBarriers.empty()); + Y_ENSURE(PendingBarriers.front().Barrier >= watermark); } + void PauseByCheckpoint() override { + Y_ENSURE(!IsPausedByCheckpoint()); + if (PauseBarrier != TBarrier::NoBarrier) { + Y_ENSURE(!PendingBarriers.empty()); + if (PendingBarriers.front().Barrier > PauseBarrier) { + // (1.BeforeBarrier) (3.Watermark > PauseBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint) + // -> + // (1.BeforeBarrier) (3.Fake watermark == PauseBarrier with data from 3 & 4 behind) (Checkpoint with empty data behind) (Max watermark from 3 & 4 with empty data behind) + auto lastWatermark = PendingBarriers.back().Barrier; + TBarrier fakeWatermark(PauseBarrier); + for (auto& barrier: PendingBarriers) { + fakeWatermark += barrier; + } + PendingBarriers.clear(); + PendingBarriers.emplace_back(fakeWatermark); + PendingBarriers.emplace_back(); // CheckpointBarrier + PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark }); + } else { + Y_ENSURE(PendingBarriers.front().Barrier == PauseBarrier); + // (1.BeforeBarrier) (3.Watermark == PauseBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint) + // -> + // (1.BeforeBarrier) (3.Watermark == PauseBarriers with all data from 3 & 4 behind) (Checkpoint with empty data behind) (Max watermark from 4 with empty data behind) + auto lastWatermark = PendingBarriers.size() > 1 ? PendingBarriers.back().Barrier : TBarrier::NoBarrier; + for (auto& firstWatermark = PendingBarriers.front(); PendingBarriers.size() > 1; PendingBarriers.pop_back()) { + firstWatermark += PendingBarriers.back(); + } + PendingBarriers.emplace_back(); // CheckpointBarrier + if (lastWatermark != TBarrier::NoBarrier) { + PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark }); + } + } + } else if (PendingBarriers.empty()) { + PendingBarriers.emplace_front(); // CheckpointBarrier + } else { + // (1.BeforeBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint) + // -> + // (1.BeforeBarrier + all data from 4) (5.Checkpoint with empty data behind) (Max watermark from 4 if any with empty data behind) + auto lastWatermark = PendingBarriers.back().Barrier; + for (auto& barrier: PendingBarriers) { // Move all collected data before checkpoint + BeforeBarrier += barrier; + } + PendingBarriers.clear(); + PendingBarriers.emplace_back(); // CheckpointBarrier + PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark }); + } + } + + void AddWatermark(TInstant watermark) override { + if (!PendingBarriers.empty() && PendingBarriers.back().Batches == 0 && !PendingBarriers.back().IsCheckpoint()) { + Y_ENSURE(PendingBarriers.back().Rows == 0); + Y_ENSURE(PendingBarriers.back().Bytes == 0); + PendingBarriers.back().Barrier = watermark; + } else { + PendingBarriers.emplace_back(TBarrier { .Barrier = watermark }); + } + } + + bool IsPausedByWatermark() const override { + return !IsPausedByCheckpoint() && PauseBarrier != TBarrier::NoBarrier; + } + + bool IsPausedByCheckpoint() const override { + return !PendingBarriers.empty() && PendingBarriers.front().IsCheckpoint(); + } + + void ResumeByWatermark(TInstant watermark) override { + Y_ENSURE(Empty()); + Y_ENSURE(PauseBarrier == watermark); + PauseBarrier = TBarrier::NoBarrier; + if (IsFinished()) { + return; + } + Y_ENSURE(!PendingBarriers.empty()); + Y_ENSURE(PendingBarriers.front().Barrier >= watermark); + if (PendingBarriers.front().Barrier == watermark) { + BeforeBarrier = PendingBarriers.front(); + PendingBarriers.pop_front(); + } + Y_ENSURE(PendingBarriers.empty() || PendingBarriers.front().Barrier > watermark); + } + + void ResumeByCheckpoint() override { + Y_ENSURE(IsPausedByCheckpoint()); + Y_ENSURE(Empty()); + BeforeBarrier = PendingBarriers.front(); + PendingBarriers.pop_front(); + // There can be watermarks before current barrier exposed by checkpoint removal + SkipWatermarksBeforeBarrier(); + } + +protected: + TMaybe<ui32> GetWidth() const { return Width; } @@ -296,12 +423,35 @@ protected: ui64 StoredBytes = 0; ui64 StoredRows = 0; bool Finished = false; - ui64 BatchesBeforePause = 0; - ui64 StoredBytesBeforePause = 0; - ui64 StoredRowsBeforePause = 0; - static constexpr ui64 PauseMask = 1llu << 63llu; TInputChannelFormat Format = FORMAT_UNKNOWN; ui32 LegacyBlockLengthIndex = 0; + + struct TBarrier { + static constexpr TInstant NoBarrier = TInstant::Zero(); + static constexpr TInstant CheckpointBarrier = TInstant::Max(); + TInstant Barrier = CheckpointBarrier; + ui64 Batches = 0; + ui64 Bytes = 0; + ui64 Rows = 0; + // watermark (!= TInstant::Max()) or checkpoint (TInstant::Max()) + bool IsCheckpoint() const { + return Barrier == CheckpointBarrier; + } + TBarrier& operator+= (const TBarrier& other) { + Batches += other.Batches; + Bytes += other.Bytes; + Rows += other.Rows; + return *this; + } + void Clear() { + Batches = 0; + Bytes = 0; + Rows = 0; + } + }; + std::deque<TBarrier> PendingBarriers; // barrier and counts after barrier + TBarrier BeforeBarrier; // counts before barrier + TInstant PauseBarrier; // Watermark barrier or TBarrier::NoBarrier }; } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 07058c45a4e..440f94b3b20 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -736,15 +736,31 @@ public: ythrow yexception() << "unimplemented"; } - void Pause() override { + void PauseByCheckpoint() override { Y_ABORT("Checkpoints are not supported"); } - void Resume() override { + void ResumeByCheckpoint() override { Y_ABORT("Checkpoints are not supported"); } - bool IsPaused() const override { + bool IsPausedByCheckpoint() const override { + return false; + } + + void PauseByWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + void AddWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + void ResumeByWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + bool IsPausedByWatermark() const override { return false; } @@ -892,15 +908,31 @@ public: return InputType; } - void Pause() override { + void PauseByCheckpoint() override { Y_ABORT("Checkpoints are not supported"); } - void Resume() override { + void ResumeByCheckpoint() override { Y_ABORT("Checkpoints are not supported"); } - bool IsPaused() const override { + bool IsPausedByCheckpoint() const override { + return false; + } + + void PauseByWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + void AddWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + void ResumeByWatermark(TInstant) override { + Y_ABORT("Watermarks are not supported"); + } + + bool IsPausedByWatermark() const override { return false; } |