summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoryumkam <[email protected]>2025-08-27 18:42:19 +0300
committerGitHub <[email protected]>2025-08-27 18:42:19 +0300
commit4bf27a67141bbfcfe0d540eb525a2f7a42b3705b (patch)
treec35edc94f75a4aa172f46ee4a2924689cfb174ac
parent04d7c1c57c7bc770a2b7b4dd5e36a6b7a4349471 (diff)
async ca: rework watermark handling and fix inputtransform and watermark interaction (#22263)oidc-1.2.6.18-devoidc-1.2.6.0-dev
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp116
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h51
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp16
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp114
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h12
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp450
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/ya.make2
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h28
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp210
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_input.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_input.h21
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.cpp30
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_impl.h230
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp44
18 files changed, 932 insertions, 404 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index debfebfa793..669ffb18a99 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -182,9 +182,7 @@ private:
DUMP(ProcessOutputsState, LastPopReturnedNoData);
html << "<h3>Watermarks</h3>";
- for (const auto& [time, id]: WatermarkTakeInputChannelDataRequests) {
- html << "WatermarkTakeInputChannelDataRequests: " << time.ToString() << " " << id << "<br />";
- }
+ DUMP(WatermarksTracker, GetPendingWatermark, ());
html << "<h3>CPU Quota</h3>";
html << "QuoterServiceActorId: " << QuoterServiceActorId.ToString() << "<br />";
@@ -251,7 +249,6 @@ private:
DUMP(info, ChannelId);
DUMP(info, SrcStageId);
DUMP(info, HasPeer);
- html << "PendingWatermarks: " << !info.PendingWatermarks.empty() << " " << (info.PendingWatermarks.empty() ? TString{} : info.PendingWatermarks.back().ToString()) << "<br />";
html << "WatermarksMode: " << NDqProto::EWatermarksMode_Name(info.WatermarksMode) << "<br />";
html << "PendingCheckpoint: " << info.PendingCheckpoint.has_value() << " " << (info.PendingCheckpoint ? TStringBuilder{} << info.PendingCheckpoint->GetId() << " " << info.PendingCheckpoint->GetGeneration() : TString{}) << "<br />";
html << "CheckpointingMode: " << NDqProto::ECheckpointingMode_Name(info.CheckpointingMode) << "<br />";
@@ -310,7 +307,6 @@ private:
html << "DqInputBuffer.InputType: " << (buffer->GetInputType() ? buffer->GetInputType()->GetKindAsStr() : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.InputWidth: " << (buffer->GetInputWidth() ? ToString(*buffer->GetInputWidth()) : TString{"unknown"}) << "<br />";
html << "DqInputBuffer.IsFinished: " << buffer->IsFinished() << "<br />";
- html << "DqInputBuffer.IsPaused: " << buffer->IsPaused() << "<br />";
html << "DqInputBuffer.IsPending: " << buffer->IsPending() << "<br />";
const auto& popStats = buffer->GetPopStats();
@@ -525,7 +521,7 @@ private:
}
const bool wasFinished = outputChannel.Finished;
- auto channelId = outputChannel.ChannelId;
+ const auto channelId = outputChannel.ChannelId;
const auto& peerState = Channels->GetOutputChannelInFlightState(channelId);
@@ -620,18 +616,6 @@ private:
TMaybe<TInstant> watermark;
if (channelData.HasWatermark()) {
watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs());
-
- const bool channelWatermarkChanged = WatermarksTracker.NotifyInChannelWatermarkReceived(
- inputChannel->ChannelId,
- *watermark
- );
-
- if (channelWatermarkChanged) {
- CA_LOG_T("Pause input channel " << channelData.GetChannelId() << " bacause of watermark " << *watermark);
- inputChannel->Pause(*watermark);
- }
-
- WatermarkTakeInputChannelDataRequests[*watermark]++;
}
TDqSerializedBatch batch;
@@ -647,7 +631,8 @@ private:
channelData.GetChannelId(),
batch.RowCount() ? std::optional{std::move(batch)} : std::nullopt,
finished,
- channelData.HasCheckpoint()
+ channelData.HasCheckpoint(),
+ watermark
);
Send(TaskRunnerActorId, ev.Release(), 0, Cookie);
@@ -681,22 +666,6 @@ private:
TBase::PassAway();
}
- TMaybe<TInstant> GetWatermarkRequest() {
- if (!WatermarksTracker.HasPendingWatermark()) {
- return Nothing();
- }
-
- const auto pendingWatermark = *WatermarksTracker.GetPendingWatermark();
- if (WatermarkTakeInputChannelDataRequests.contains(pendingWatermark)) {
- // Not all precending to watermark input channels data has been injected
- return Nothing();
- }
-
- MetricsReporter.ReportInjectedToTaskRunnerWatermark(pendingWatermark);
-
- return pendingWatermark;
- }
-
TMaybe<NDqProto::TCheckpoint> GetCheckpointRequest() {
if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) {
CheckpointRequestedFromTaskRunner = true;
@@ -806,9 +775,24 @@ private:
}
}
- if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
- ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
- WatermarksTracker.PopPendingWatermark();
+ if (auto watermark = ev->Get()->WatermarkInjectedToOutputs) {
+ ResumeInputsByWatermark(*watermark);
+ if (WatermarksTracker.NotifyWatermarkWasSent(*watermark)) {
+ MetricsReporter.ReportInjectedToOutputsWatermark(*watermark);
+ WatermarksTracker.PopPendingWatermark();
+ }
+ // sources or input channels was unpaused, trigger new poll
+ ResumeExecution(EResumeSource::CAWatermarkInject);
+ }
+
+ for (auto inputChannelId : ev->Get()->FinishedInputsWithWatermarks) {
+ CA_LOG_T("Unregister watermarked input channel " << inputChannelId);
+ WatermarksTracker.UnregisterInputChannel(inputChannelId);
+ }
+
+ for (auto sourceId : ev->Get()->FinishedSourcesWithWatermarks) {
+ CA_LOG_T("Unregister watermarked async input " << sourceId);
+ WatermarksTracker.UnregisterAsyncInput(sourceId);
}
ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState;
@@ -927,18 +911,6 @@ private:
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty();
- if (asyncData.Watermark.Defined()) {
- const auto watermark = TInstant::MicroSeconds(asyncData.Watermark->GetTimestampUs());
- const bool shouldResumeInputs = WatermarksTracker.NotifyOutputChannelWatermarkSent(
- outputChannel.ChannelId,
- watermark
- );
-
- if (shouldResumeInputs) {
- ResumeInputsByWatermark(watermark);
- }
- }
-
if (!shouldSkipData) {
if (asyncData.Checkpoint.Defined()) {
ResumeInputsByCheckpoint();
@@ -985,28 +957,32 @@ private:
}
void OnInputChannelDataAck(NTaskRunnerActor::TEvInputChannelDataAck::TPtr& ev) {
- auto it = TakeInputChannelDataRequests.find(ev->Cookie);
+ const auto it = TakeInputChannelDataRequests.find(ev->Cookie);
YQL_ENSURE(it != TakeInputChannelDataRequests.end());
+ const auto channelId = it->second.ChannelId;
+ const auto watermark = it->second.Watermark;
+
CA_LOG_T("Input data push finished. Cookie: " << ev->Cookie
- << " Watermark: " << it->second.Watermark
+ << " Watermark: " << watermark
<< " Ack: " << it->second.Ack
<< " TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size()
- << " WatermarkTakeInputChannelDataRequests: " << WatermarkTakeInputChannelDataRequests.size());
-
- if (it->second.Watermark.Defined()) {
- auto& ct = WatermarkTakeInputChannelDataRequests.at(*it->second.Watermark);
- if (--ct == 0) {
- WatermarkTakeInputChannelDataRequests.erase(*it->second.Watermark);
- }
- }
+ );
- TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId);
+ TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(channelId);
Y_ABORT_UNLESS(inputChannel);
+
inputChannel->FreeSpace = ev->Get()->FreeSpace;
+ if (watermark) {
+ if (WatermarksTracker.NotifyInChannelWatermarkReceived( channelId, *watermark)) {
+ CA_LOG_T("Pause input channel " << channelId << " because of watermark");
+ inputChannel->Pause(*watermark); // XXX does nothing in async CA
+ }
+ }
+
if (it->second.Ack) {
- Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace);
+ Channels->SendChannelDataAck(channelId, inputChannel->FreeSpace);
}
TakeInputChannelDataRequests.erase(it);
@@ -1155,14 +1131,6 @@ private:
ContinueRunEvent = std::make_unique<NTaskRunnerActor::TEvContinueRun>();
}
ContinueRunEvent->CheckpointOnly = checkpointOnly;
- if (TMaybe<TInstant> watermarkRequest = GetWatermarkRequest()) {
- if (!ContinueRunEvent->WatermarkRequest) {
- ContinueRunEvent->WatermarkRequest.ConstructInPlace();
- ContinueRunEvent->WatermarkRequest->Watermark = *watermarkRequest;
- } else {
- ContinueRunEvent->WatermarkRequest->Watermark = Max(ContinueRunEvent->WatermarkRequest->Watermark, *watermarkRequest);
- }
- }
if (checkpointRequest) {
if (!ContinueRunEvent->CheckpointRequest) {
ContinueRunEvent->CheckpointRequest.ConstructInPlace(*checkpointRequest);
@@ -1171,6 +1139,11 @@ private:
Y_ABORT_UNLESS(ContinueRunEvent->CheckpointRequest->Checkpoint.GetId() == checkpointRequest->GetId());
}
}
+ if (auto watermarkRequest = WatermarksTracker.GetPendingWatermark()) {
+ Y_ENSURE(*watermarkRequest >= ContinueRunEvent->WatermarkRequest);
+ ContinueRunEvent->WatermarkRequest = *watermarkRequest;
+ MetricsReporter.ReportInjectedToTaskRunnerWatermark(*watermarkRequest);
+ }
if (!UseCpuQuota()) {
Send(TaskRunnerActorId, ContinueRunEvent.release());
@@ -1240,9 +1213,6 @@ private:
TMaybe<TInstant> Watermark;
};
THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests;
- // Watermark should be injected to task runner only after all precending data is injected
- // This hash map will help to track the right moment
- THashMap<TInstant, ui32> WatermarkTakeInputChannelDataRequests;
ui64 Cookie = 0;
NDq::TDqTaskRunnerStatsView TaskRunnerStats;
bool ReadyToCheckpointFlag;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
index 22aa7e6f586..42bc5e8916b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_input_helper.h
@@ -45,8 +45,9 @@ public:
}
void ResumeByWatermark(TInstant watermark) {
- YQL_ENSURE(watermark == PendingWatermark);
- PendingWatermark = Nothing();
+ if (watermark >= PendingWatermark) {
+ PendingWatermark = Nothing();
+ }
}
virtual i64 GetFreeSpace() const = 0;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 9f805d82591..037766a7fa3 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -55,6 +55,7 @@ enum class EResumeSource : ui32 {
CADataSent,
CAPendingOutput,
CATaskRunnerCreated,
+ CAWatermarkInject,
Last,
};
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index dbf4141f953..7618a3119cc 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -764,6 +764,7 @@ protected: //TDqComputeActorCheckpoints::ICallbacks
sourceInfo.ResumeByWatermark(watermark);
}
+ // XXX Does nothing in async CA, not used (yet) in sync CA
for (auto& [id, channelInfo] : InputChannelsMap) {
if (channelInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
continue;
@@ -839,7 +840,6 @@ protected:
ui32 SrcStageId;
IDqInputChannel::TPtr Channel;
bool HasPeer = false;
- std::queue<TInstant> PendingWatermarks;
const NDqProto::EWatermarksMode WatermarksMode;
std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
const NDqProto::ECheckpointingMode CheckpointingMode;
@@ -860,13 +860,15 @@ protected:
}
bool IsPaused() const {
- return !PendingWatermarks.empty() || PendingCheckpoint.has_value();
+ return PendingCheckpoint.has_value();
}
void Pause(TInstant watermark) {
YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED);
- PendingWatermarks.emplace(watermark);
+ if (Channel) {
+ Channel->AddWatermark(watermark);
+ }
}
void Pause(const NDqProto::TCheckpoint& checkpoint) {
@@ -874,24 +876,20 @@ protected:
YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
PendingCheckpoint = checkpoint;
if (Channel) { // async actor doesn't hold channels, so channel is paused in task runner actor
- Channel->Pause();
+ Channel->PauseByCheckpoint();
}
}
void ResumeByWatermark(TInstant watermark) {
- while (!PendingWatermarks.empty() && PendingWatermarks.front() <= watermark) {
- if (PendingWatermarks.front() != watermark) {
- CA_LOG_W("Input channel " << ChannelId <<
- " watermarks were collapsed. See YQ-1441. Dropped watermark: " << PendingWatermarks.front());
- }
- PendingWatermarks.pop();
+ if (Channel) {
+ Channel->ResumeByWatermark(watermark);
}
}
void ResumeByCheckpoint() {
PendingCheckpoint.reset();
if (Channel) { // async actor doesn't hold channels, so channel is resumed in task runner actor
- Channel->Resume();
+ Channel->ResumeByCheckpoint();
}
}
@@ -1609,18 +1607,11 @@ protected:
void InitializeTask() {
for (ui32 i = 0; i < Task.InputsSize(); ++i) {
const auto& inputDesc = Task.GetInputs(i);
+ auto watermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
Y_ABORT_UNLESS(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels
- if (inputDesc.HasTransform()) {
- auto result = InputTransformsMap.emplace(
- i,
- TAsyncInputTransformHelper(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED)
- );
- YQL_ENSURE(result.second);
- }
-
if (inputDesc.HasSource()) {
- const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode();
+ watermarksMode = inputDesc.GetSource().GetWatermarksMode();
auto result = SourcesMap.emplace(
i,
static_cast<TDerived*>(this)->CreateInputHelper(LogPrefix, i, watermarksMode)
@@ -1628,18 +1619,30 @@ protected:
YQL_ENSURE(result.second);
} else {
for (auto& channel : inputDesc.GetChannels()) {
+ auto channelWatermarksMode = channel.GetWatermarksMode();
+ if (channelWatermarksMode != NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
+ watermarksMode = channelWatermarksMode;
+ }
auto result = InputChannelsMap.emplace(
channel.GetId(),
TInputChannelInfo(
LogPrefix,
channel.GetId(),
channel.GetSrcStageId(),
- channel.GetWatermarksMode(),
+ channelWatermarksMode,
channel.GetCheckpointingMode())
);
YQL_ENSURE(result.second);
}
}
+
+ if (inputDesc.HasTransform()) {
+ auto result = InputTransformsMap.emplace(
+ i,
+ TAsyncInputTransformHelper(LogPrefix, i, watermarksMode)
+ );
+ YQL_ENSURE(result.second);
+ }
}
for (ui32 i = 0; i < Task.OutputsSize(); ++i) {
@@ -1694,12 +1697,6 @@ private:
WatermarksTracker.RegisterInputChannel(id);
}
}
-
- for (const auto& [id, channel] : OutputChannelsMap) {
- if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
- WatermarksTracker.RegisterOutputChannel(id);
- }
- }
}
virtual const NYql::NDq::TDqTaskRunnerStats* GetTaskRunnerStats() = 0;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
index 91b60286afa..020441a11f7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
@@ -136,10 +136,10 @@ void TDqComputeActorMetrics::ReportInjectedToOutputsWatermark(TInstant watermark
}
InjectedToOutputsWatermark->Set(watermark.MilliSeconds());
- auto iter = WatermarkStartedAt.find(watermark);
- if (iter != WatermarkStartedAt.end()) {
- WatermarkCollectLatency->Collect((TInstant::Now() - iter->second).MilliSeconds());
- WatermarkStartedAt.erase(iter);
+ auto iter = WatermarkStartedAt.upper_bound(watermark);
+ if (iter != WatermarkStartedAt.begin()) {
+ WatermarkCollectLatency->Collect((TInstant::Now() - WatermarkStartedAt.begin()->second).MilliSeconds());
+ WatermarkStartedAt.erase(WatermarkStartedAt.begin(), iter);
}
}
@@ -162,9 +162,13 @@ NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetInputChannelCounters(
}
void TDqComputeActorMetrics::ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark) {
+ if (!Enable) {
+ return;
+ }
counters->GetCounter("watermark_ms")->Set(watermark.MilliSeconds());
- if (!WatermarkStartedAt.contains(watermark)) {
- WatermarkStartedAt[watermark] = TInstant::Now();
+ auto [it, inserted] = WatermarkStartedAt.emplace(watermark, TInstant::Zero());
+ if (inserted) {
+ it->second = TInstant::Now();
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
index 6044acd65d2..1a8022e04dd 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
@@ -55,7 +55,7 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr AsyncInputError;
NMonitoring::TDynamicCounters::TCounterPtr OtherEvent;
- THashMap<TInstant, TInstant> WatermarkStartedAt;
+ TMap<TInstant, TInstant> WatermarkStartedAt;
};
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
index 98c5c9d8ab3..a067ae7e366 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
@@ -31,26 +31,34 @@ void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) {
AsyncInputsWatermarks[inputId] = Nothing();
}
-void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) {
- InputChannelsWatermarks[inputId] = Nothing();
+void TDqComputeActorWatermarks::UnregisterAsyncInput(ui64 inputId) {
+ auto found = AsyncInputsWatermarks.erase(inputId);
+ Y_ENSURE(found);
+ RecalcPendingWatermark();
}
-void TDqComputeActorWatermarks::RegisterOutputChannel(ui64 outputId) {
- OutputChannelsWatermarks[outputId] = Nothing();
+void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) {
+ InputChannelsWatermarks[inputId] = Nothing();
}
-bool TDqComputeActorWatermarks::HasOutputChannels() const {
- return !OutputChannelsWatermarks.empty();
+void TDqComputeActorWatermarks::UnregisterInputChannel(ui64 inputId) {
+ auto found = InputChannelsWatermarks.erase(inputId);
+ Y_ENSURE(found);
+ RecalcPendingWatermark();
}
bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark) {
+ auto it = AsyncInputsWatermarks.find(inputId);
+ if (it == AsyncInputsWatermarks.end()) {
+ LOG_D("Ignored watermark notification on unregistered async input " << inputId);
+ return false;
+ }
+
LOG_T("Async input " << inputId << " notified about watermark " << watermark);
- auto& asyncInputWatermark = AsyncInputsWatermarks[inputId];
- if (!asyncInputWatermark || *asyncInputWatermark < watermark) {
+ auto& asyncInputWatermark = it->second;
+ if (UpdateAndRecalcPendingWatermark(asyncInputWatermark, watermark)) {
LOG_T("Async input " << inputId << " watermark was updated to " << watermark);
- asyncInputWatermark = watermark;
- RecalcPendingWatermark();
return true;
}
@@ -58,38 +66,60 @@ bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId,
}
bool TDqComputeActorWatermarks::NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark) {
+ auto it = InputChannelsWatermarks.find(inputId);
+ if (it == InputChannelsWatermarks.end()) {
+ LOG_D("Ignored watermark notification on unregistered input channel" << inputId);
+ return false;
+ }
+
LOG_T("Input channel " << inputId << " notified about watermark " << watermark);
- auto& inputChannelWatermark = InputChannelsWatermarks[inputId];
- if (!inputChannelWatermark || *inputChannelWatermark < watermark) {
+ auto& inputChannelWatermark = it->second;
+ if (UpdateAndRecalcPendingWatermark(inputChannelWatermark, watermark)) {
LOG_T("Input channel " << inputId << " watermark was updated to " << watermark);
- inputChannelWatermark = watermark;
- RecalcPendingWatermark();
return true;
}
return false;
}
-bool TDqComputeActorWatermarks::NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark) {
- auto logPrefix = TStringBuilder() << "Output channel "
- << outputId << " notified about watermark '" << watermark << "'";
+// Modifies and optionally recalc pending watermarks
+bool TDqComputeActorWatermarks::UpdateAndRecalcPendingWatermark(TMaybe<TInstant>& storedWatermark, TInstant watermark) {
+ if (storedWatermark < watermark) {
+ auto oldWatermark = std::exchange(storedWatermark, watermark);
+ if (LastWatermark == oldWatermark) {
+ // LastWatermark was unset; old watermark value was unset
+ // -> it is possible now all channels have watermark set, needs recalc
+ // LastWatermark was set and same as old watermark value
+ // -> it is possible LastWatermark can be advanced, needs recalc
+ RecalcPendingWatermark();
+ } else {
+ // otherwise LastWatermark will be same
+ }
+ return true;
+ }
+ return false;
+}
- LOG_T(logPrefix);
+bool TDqComputeActorWatermarks::NotifyWatermarkWasSent(TInstant watermark) {
+ auto logPrefix = [watermark] {
+ return TStringBuilder() << "Output notified about watermark '" << watermark << "'";
+ };
- if (watermark <= LastWatermark) {
- LOG_E(logPrefix << "' when LastWatermark was already forwarded to " << *LastWatermark);
- // We will try to ignore this error, but something strange happened
+ LOG_T(logPrefix());
+
+ if (watermark < PendingWatermark) {
+ LOG_D(logPrefix() << " before '" << PendingWatermark << "'");
+ return false;
}
- if (watermark != PendingWatermark) {
- LOG_E(logPrefix << " when '" << PendingWatermark << "' was expected");
+ if (watermark > PendingWatermark) {
+ LOG_E(logPrefix() << " when '" << PendingWatermark << "' was expected");
// We will try to ignore this error, but something strange happened
+ return false;
}
- OutputChannelsWatermarks[outputId] = watermark;
-
- return MaybePopPendingWatermark();
+ return true;
}
bool TDqComputeActorWatermarks::HasPendingWatermark() const {
@@ -122,41 +152,13 @@ void TDqComputeActorWatermarks::RecalcPendingWatermark() {
newWatermark = std::min(newWatermark, *watermark);
}
- if (!LastWatermark || newWatermark != LastWatermark) {
+ if (newWatermark > LastWatermark) {
LOG_T("New pending watermark " << newWatermark);
PendingWatermark = newWatermark;
+ LastWatermark = newWatermark;
}
}
-bool TDqComputeActorWatermarks::MaybePopPendingWatermark() {
- if (OutputChannelsWatermarks.empty()) {
- return true;
- }
-
- if (!PendingWatermark) {
- LOG_E("There is no pending watermark, but pop was called");
- // We will try to ignore this error, but something strange happened
- return true;
- }
-
- auto outWatermark = TInstant::Max();
- for (const auto& [_, watermark] : OutputChannelsWatermarks) {
- if (!watermark) {
- return false;
- }
-
- outWatermark = std::min(outWatermark, *watermark);
- }
-
- if (outWatermark >= *PendingWatermark) {
- LastWatermark = PendingWatermark;
- PopPendingWatermark();
- return true;
- }
-
- return false;
-}
-
void TDqComputeActorWatermarks::PopPendingWatermark() {
LOG_T("Watermark " << *PendingWatermark << " was popped. ");
PendingWatermark = Nothing();
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
index 1ac5d5326c2..1f7ca1b691c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
@@ -13,8 +13,9 @@ public:
void RegisterAsyncInput(ui64 inputId);
void RegisterInputChannel(ui64 inputId);
- void RegisterOutputChannel(ui64 outputId);
- bool HasOutputChannels() const;
+
+ void UnregisterAsyncInput(ui64 inputId);
+ void UnregisterInputChannel(ui64 inputId);
// Will return true, if local watermark inside this async input was moved forward.
// CA should pause this async input and wait for coresponding watermarks in all other sources/inputs.
@@ -24,9 +25,8 @@ public:
// CA should pause this input channel and wait for coresponding watermarks in all other sources/inputs.
bool NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark);
- // Will return true, if watermark was sent to all registered outputs.
- // CA should resume inputs and sources in this case
- bool NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark);
+ // Will return true, if pending watermark completed.
+ bool NotifyWatermarkWasSent(TInstant watermark);
bool HasPendingWatermark() const;
TMaybe<TInstant> GetPendingWatermark() const;
@@ -36,6 +36,7 @@ public:
private:
void RecalcPendingWatermark();
+ bool UpdateAndRecalcPendingWatermark(TMaybe<TInstant>& storedWatermark, TInstant watermark);
bool MaybePopPendingWatermark();
private:
@@ -43,7 +44,6 @@ private:
std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks;
std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks;
- std::unordered_map<ui64, TMaybe<TInstant>> OutputChannelsWatermarks;
TMaybe<TInstant> PendingWatermark;
TMaybe<TInstant> LastWatermark;
diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp
index 215e2d6e1ed..d8aaa41b646 100644
--- a/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp
+++ b/ydb/library/yql/dq/actors/compute/ut/dq_async_compute_actor_ut.cpp
@@ -1,5 +1,7 @@
#include <library/cpp/testing/unittest/registar.h>
+#include <util/string/cast.h>
+
#include <ydb/library/actors/testlib/test_runtime.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/services/services.pb.h>
@@ -37,8 +39,8 @@ namespace NYql::NDq {
namespace {
static const bool TESTS_VERBOSE = getenv("TESTS_VERBOSE") != nullptr;
-#define LOG_D(stream) LOG_DEBUG_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream);
-#define LOG_E(stream) LOG_ERROR_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream);
+#define LOG_D(stream) LOG_DEBUG_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream)
+#define LOG_E(stream) LOG_ERROR_S(*ActorSystem.SingleSys(), NKikimrServices::KQP_COMPUTE, LogPrefix << stream)
struct TMockHttpRequest : NMonitoring::IMonHttpRequest {
TStringStream Out;
TCgiParameters Params;
@@ -111,6 +113,7 @@ struct TActorSystem: NActors::TTestActorRuntimeBase {
if (TESTS_VERBOSE) {
SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_TRACE);
+ SetLogPriority(NKikimrServices::DQ_TASK_RUNNER, NActors::NLog::EPriority::PRI_TRACE);
}
}
};
@@ -125,8 +128,8 @@ NDq::IDqAsyncIoFactory::TPtr CreateAsyncIoFactory() {
}
struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
- static constexpr ui64 InputChannelId = 1;
- static constexpr ui64 OutputChannelId = 2;
+ static constexpr ui64 InputChannelId = 1000;
+ static constexpr ui64 OutputChannelId = 2000;
static constexpr ui32 InputStageId = 123;
static constexpr ui32 ThisStageId = 456;
static constexpr ui32 OutputStageId = 789;
@@ -137,7 +140,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
static constexpr i32 MaxTransformedValue = 10;
TActorSystem ActorSystem;
TActorId EdgeActor;
- TActorId SrcEdgeActor;
+ std::unordered_map<ui64, TActorId> SrcEdgeActor; // ChannelId -> actor
TActorId DstEdgeActor;
TScopedAlloc Alloc;
@@ -196,7 +199,6 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
ActorSystem.Start();
EdgeActor = ActorSystem.AllocateEdgeActor();
- SrcEdgeActor = ActorSystem.AllocateEdgeActor();
DstEdgeActor = ActorSystem.AllocateEdgeActor();
}
@@ -388,13 +390,16 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
// Adds dummy input channel with channelId
// returns IDqOutputChannel::TPtr that can be used to inject data/checkpoints/watermarks into channel
- auto AddDummyInputChannel(NDqProto::TDqTask& task, ui64 channelId) {
- auto& input = *task.AddInputs();
+ auto AddDummyInputChannel(NDqProto::TTaskInput& input, ui64 channelId) {
auto& channel = *input.AddChannels();
input.MutableUnionAll(); // for side-effect
channel.SetId(channelId);
+ const auto& [srcEdgeActor, inserted] = SrcEdgeActor.try_emplace(channelId);
+ if (inserted) {
+ srcEdgeActor->second = ActorSystem.AllocateEdgeActor();
+ }
auto& chEndpoint = *channel.MutableSrcEndpoint();
- ActorIdToProto(SrcEdgeActor, chEndpoint.MutableActorId());
+ ActorIdToProto(srcEdgeActor->second, chEndpoint.MutableActorId());
channel.SetWatermarksMode(NDqProto::WATERMARKS_MODE_DEFAULT);
channel.SetCheckpointingMode(NDqProto::CHECKPOINTING_MODE_DEFAULT);
channel.SetInMemory(true);
@@ -412,12 +417,29 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
TDqOutputChannelSettings settings;
settings.TransportVersion = TransportVersion;
settings.MutableSettings.IsLocalChannel = true;
+ settings.Level = TCollectStatsLevel::Profile;
return CreateDqOutputChannel(channelId, ThisStageId,
(IsWide ? static_cast<TType*>(WideRowType) : RowType), HolderFactory,
settings,
logFunc);
}
+ auto AddDummyInputChannel(NDqProto::TDqTask& task, ui64 channelId) {
+ auto& input = *task.AddInputs();
+ return AddDummyInputChannel(input, channelId);
+ }
+
+ auto AddDummyInputChannels(NDqProto::TDqTask& task, ui64 baseChannelId, ui64 numChannels) {
+ auto& input = *task.AddInputs();
+ TVector<IDqOutputChannel::TPtr> fakeOutputs;
+
+ for (; numChannels--; ++baseChannelId) {
+ fakeOutputs.push_back(AddDummyInputChannel(input, baseChannelId));
+ }
+
+ return fakeOutputs;
+ }
+
// Adds dummy output channel with channelId
// returns IDqInputChannel::TPtr that can be used to simulating reading from this channel
auto AddDummyOutputChannel(NDqProto::TDqTask& task, ui64 channelId, TType* type) {
@@ -443,14 +465,14 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
ThisStageId,
type,
10_MB,
- TCollectStatsLevel::None,
+ TCollectStatsLevel::Profile,
TypeEnv,
HolderFactory,
TransportVersion,
NKikimr::NMiniKQL::EValuePackerVersion::V0);
}
- auto CreateTestAsyncCA(NDqProto::TDqTask& task) {
+ auto CreateTestAsyncCA(NDqProto::TDqTask& task, NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_PROFILE) {
TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = {
NYql::GetCommonDqFactory(),
NKikimr::NMiniKQL::GetYqlFactory()
@@ -474,17 +496,19 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
});
TComputeMemoryLimits memoryLimits;
memoryLimits.ChannelBufferSize = 1_MB;
- memoryLimits.MkqlLightProgramMemoryLimit = 10_MB;
- memoryLimits.MkqlHeavyProgramMemoryLimit = 20_MB;
- memoryLimits.MkqlProgramHardMemoryLimit = 30_MB;
+ memoryLimits.MkqlLightProgramMemoryLimit = 20_MB;
+ memoryLimits.MkqlHeavyProgramMemoryLimit = 30_MB;
+ memoryLimits.MkqlProgramHardMemoryLimit = 40_MB;
memoryLimits.MemoryQuotaManager = std::make_shared<TGuaranteeQuotaManager>(64_MB, 32_MB);
+ TComputeRuntimeSettings runtimeSettings;
+ runtimeSettings.StatsMode = statsMode;
auto actor = CreateDqAsyncComputeActor(
EdgeActor, // executerId,
LogPrefix,
&task, // NYql::NDqProto::TDqTask* task,
CreateAsyncIoFactory(),
FunctionRegistry.Get(),
- {}, // TComputeRuntimeSettings& settings,
+ runtimeSettings,
memoryLimits,
taskRunnerActorFactory,
{}, // ::NMonitoring::TDynamicCounterPtr taskCounters,
@@ -496,6 +520,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
}
TUnboxedValueBatch CreateRow(ui32 value, ui64 ts) {
+ LOG_D("create " << value << " " << ts);
if (IsWide) {
TUnboxedValueBatch result(WideRowType);
result.PushRow([&](ui32 idx) {
@@ -521,13 +546,21 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
}
}
- bool ReceiveData(auto&& cb, auto&& cbWatermark, auto dqInputChannel) {
- auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelData>({DstEdgeActor});
+ // cb(TUnboxedValue& value, ui32 column) is called for each value in a row
+ // cbWatermark(TInstant watermark) is called for each received watermark
+ // beforeFinalAck() is called before sending final ack (when CA is still definitely alive)
+ bool ReceiveData(auto&& cb, auto&& cbWatermark, auto&& beforeFinalAck, auto dqInputChannel) {
+ auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelData>({DstEdgeActor}, TDuration::Seconds(20));
+ if (!ev) {
+ throw yexception() << "Failed";
+ }
LOG_D("Got " << ev->Get()->Record.DebugString());
TDqSerializedBatch sbatch;
- sbatch.Proto = ev->Get()->Record.GetChannelData().GetData();
+ auto& channelData = *ev->Get()->Record.MutableChannelData();
+ sbatch.Proto = std::move(*channelData.MutableData());
dqInputChannel->Push(std::move(sbatch));
- if (ev->Get()->Record.GetChannelData().GetFinished()) {
+ bool finished = channelData.GetFinished();
+ if (finished) {
dqInputChannel->Finish();
}
TUnboxedValueBatch batch;
@@ -566,22 +599,34 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
LOG_D("null");
UNIT_ASSERT(false);
}
+ LOG_D("/");
return true;
})) {
return false;
}
}
}
- if (ev->Get()->Record.GetChannelData().HasWatermark()) {
- auto watermark = TInstant::MicroSeconds(ev->Get()->Record.GetChannelData().GetWatermark().GetTimestampUs());
+ if (channelData.HasWatermark()) {
+ auto watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs());
cbWatermark(watermark);
}
+ if (dqInputChannel->IsFinished()) {
+ beforeFinalAck();
+ }
+ if (!ev->Get()->Record.GetNoAck()) {
+ auto ack = new TEvDqCompute::TEvChannelDataAck;
+ ack->Record.SetChannelId(channelData.GetChannelId());
+ ack->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
+ ack->Record.SetFreeSpace(3123); // XXX simulates limited channel size
+ ack->Record.SetFinish(channelData.GetFinished());
+ ActorSystem.Send(ev->Sender, ev->Recipient, ack);
+ }
return !dqInputChannel->IsFinished();
}
void WaitForChannelDataAck(auto channelId, auto seqNo) {
for (;;) {
- auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelDataAck>({SrcEdgeActor});
+ auto ev = ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvChannelDataAck>(SrcEdgeActor[channelId]);
LOG_D("Got ack " << ev->Get()->Record);
UNIT_ASSERT_EQUAL(ev->Get()->Record.GetChannelId(), channelId);
if (ev->Get()->Record.GetSeqNo() == seqNo) {
@@ -606,111 +651,191 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
}
}
- void BasicTests(ui32 packets, bool doWatermark, bool waitIntermediateAcks) {
- LogPrefix = TStringBuilder() << "Square Test for:"
- << " packets=" << packets
- << " doWatermark=" << doWatermark
- << " waitIntermediateAcks=" << waitIntermediateAcks
- << " ";
- NDqProto::TDqTask task;
- GenerateSquareProgram(task, [](auto& ctx) {
- return ctx.template MakeType<TDataExprType>(EDataSlot::Int32);
- });
- auto dqOutputChannel = AddDummyInputChannel(task, InputChannelId);
- auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowType) : RowType));
+ static constexpr ui32 NoAckPeriod = 2;
- auto asyncCA = CreateTestAsyncCA(task);
- ActorSystem.EnableScheduleForActor(asyncCA, true);
- ui32 seqNo = 0;
- ui32 val = 0;
+ void SendData(auto&& generator, const auto& asyncCA, ui32 packets, bool waitIntermediateAcks) {
for (ui32 packet = 1; packet <= packets; ++packet) {
bool isFinal = packet == packets;
- bool noAck = (packet % 2) == 0; // set noAck on even packets
+ bool noAck = (packet % NoAckPeriod) == 0; // set noAck on even packets
- PushRow(CreateRow(++val, packet), dqOutputChannel);
- PushRow(CreateRow(++val, packet), dqOutputChannel);
- PushRow(CreateRow(++val, packet), dqOutputChannel);
- if (doWatermark) {
- NDqProto::TWatermark watermark;
- watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds());
- dqOutputChannel->Push(std::move(watermark));
- }
+ auto [dqOutputChannel, seqNo] = generator(packet, isFinal);
if (isFinal) {
dqOutputChannel->Finish();
}
auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>();
- evInputChannelData->Record.SetSeqNo(++seqNo);
- evInputChannelData->Record.SetNoAck(noAck);
+ evInputChannelData->Record.SetSeqNo(++*seqNo);
auto& chData = *evInputChannelData->Record.MutableChannelData();
+ auto channelId = dqOutputChannel->GetChannelId();
+ chData.SetChannelId(channelId);
if (TDqSerializedBatch serializedBatch; dqOutputChannel->Pop(serializedBatch)) {
*chData.MutableData() = serializedBatch.Proto;
Y_ENSURE(serializedBatch.Payload.Empty()); // TODO
}
if (NDqProto::TWatermark watermark; dqOutputChannel->Pop(watermark)) {
*chData.MutableWatermark() = watermark;
+ noAck = false; // packet containing watermark must be acked
}
if (NDqProto::TCheckpoint checkpoint; dqOutputChannel->Pop(checkpoint)) {
*chData.MutableCheckpoint() = checkpoint;
+ noAck = false; // packet containing checkpoint must be acked
+ }
+ if (dqOutputChannel->IsFinished()) {
+ chData.SetFinished(true);
+ noAck = false; // final packet must be acked
}
- chData.SetChannelId(InputChannelId);
- chData.SetFinished(dqOutputChannel->IsFinished());
+ evInputChannelData->Record.SetNoAck(noAck);
LOG_D("Sending " << packet << "/" << packets << " " << chData);
- ActorSystem.Send(asyncCA, SrcEdgeActor, evInputChannelData.Release());
- if ((isFinal || waitIntermediateAcks) && !noAck) {
- WaitForChannelDataAck(InputChannelId, seqNo);
+ ActorSystem.Send(asyncCA, SrcEdgeActor[channelId], evInputChannelData.Release());
+ if ((dqOutputChannel->IsFinished() || waitIntermediateAcks) && !noAck) {
+ WaitForChannelDataAck(dqOutputChannel->GetChannelId(), *seqNo);
}
}
+ }
- TMap<ui32, ui32> receivedData;
- TMaybe<TInstant> watermark;
- while (ReceiveData(
- [this, &receivedData, &watermark](const NUdf::TUnboxedValue& val, ui32 column) {
- UNIT_ASSERT(!!val);
- UNIT_ASSERT(val.IsEmbedded());
- if (RowType->GetMemberName(column) == "ts") {
- auto ts = val.Get<ui64>();
- if (watermark) {
- UNIT_ASSERT_GT(ts, watermark->Seconds());
- }
- return true;
- }
- UNIT_ASSERT_EQUAL(RowType->GetMemberName(column), "id");
- auto data = val.Get<i32>();
- LOG_D(data);
- ++receivedData[data];
- return true;
- },
- [this, &watermark](const auto& receivedWatermark) {
- watermark = receivedWatermark;
- LOG_D("Got watermark " << *watermark);
- },
- dqInputChannel))
- {}
- DumpMonPage(asyncCA, [this](auto&& str) {
- UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>");
- UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix);
- // TODO add validation
- LOG_D(str);
- });
- UNIT_ASSERT_EQUAL(receivedData.size(), val);
- for (; val > 0; --val) {
- UNIT_ASSERT_EQUAL_C(receivedData[val * val], 1, "expected count for " << (val * val));
- }
+ void SendFinish(const auto& asyncCA, auto dqOutputChannel, auto* seqNo) {
+ auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>();
+ evInputChannelData->Record.SetSeqNo(++*seqNo);
+ auto& chData = *evInputChannelData->Record.MutableChannelData();
+ auto channelId = dqOutputChannel->GetChannelId();
+ chData.SetChannelId(channelId);
+ chData.SetFinished(true);
+ evInputChannelData->Record.SetNoAck(false);
+ LOG_D("Sending FINISH " << chData);
+ ActorSystem.Send(asyncCA, SrcEdgeActor[channelId], evInputChannelData.Release());
+ WaitForChannelDataAck(dqOutputChannel->GetChannelId(), *seqNo);
}
#if 0 // TODO: switch when inputtransform will be fixed; just log for now
#define WEAK_UNIT_ASSERT_GT_C UNIT_ASSERT_GT_C
+#define WEAK_UNIT_ASSERT_LE_C UNIT_ASSERT_LE_C
+#define WEAK_UNIT_ASSERT_EQUAL_C UNIT_ASSERT_EQUAL_C
#define WEAK_UNIT_ASSERT UNIT_ASSERT
#else
#define WEAK_UNIT_ASSERT_GT_C(A, B, C) do { if (!((A) > (B))) LOG_E("Assert " #A " > " #B " failed " << C); } while(0)
+#define WEAK_UNIT_ASSERT_LE_C(A, B, C) do { if (!((A) <= (B))) LOG_E("Assert " #A " <= " #B " failed " << C); } while(0)
+#define WEAK_UNIT_ASSERT_EQUAL_C(A, B, C) do { if (!((A) == (B))) LOG_E("Assert " #A " == " #B " failed " << C); } while(0)
#define WEAK_UNIT_ASSERT(A) do { if (!(A)) LOG_E("Assert " #A " failed "); } while(0)
#endif
- void InputTransformTests(ui32 packets, bool doWatermark, bool waitIntermediateAcks) {
+ void BasicMultichannelTests(ui32 packets, ui32 watermarkPeriod, bool waitIntermediateAcks, ui32 numChannels, auto& rng, NDqProto::EDqStatsMode statsMode = NDqProto::DQ_STATS_MODE_PROFILE) {
+ LogPrefix = TStringBuilder() << "Square Test for:"
+ << " packets=" << packets
+ << " watermarkPeriod=" << watermarkPeriod
+ << " waitIntermediateAcks=" << waitIntermediateAcks
+ << " channels=" << numChannels
+ << " ";
+ NDqProto::TDqTask task;
+ GenerateSquareProgram(task, [](auto& ctx) {
+ return ctx.template MakeType<TDataExprType>(EDataSlot::Int32);
+ });
+ auto dqOutputChannels = AddDummyInputChannels(task, InputChannelId, numChannels);
+ auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowType) : RowType));
+
+ auto asyncCA = CreateTestAsyncCA(task, statsMode);
+ ActorSystem.EnableScheduleForActor(asyncCA, true);
+ ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvState>(EdgeActor);
+
+ ui32 val = 0;
+ TMaybe<TInstant> expectedWatermark;
+ TVector<ui32> seqNo(numChannels);
+ TVector<ui64> activeChannels(numChannels);
+ std::iota(activeChannels.begin(), activeChannels.end(), 0);
+ SendData([&](auto packet, bool isFinal) {
+ auto channelIdxIdx = rng() % activeChannels.size();
+ std::swap(activeChannels[channelIdxIdx], activeChannels.back());
+ auto channelIdx = activeChannels.back();
+ auto dqOutputChannel = dqOutputChannels[channelIdx];
+ PushRow(CreateRow(++val, packet), dqOutputChannel);
+ PushRow(CreateRow(++val, packet), dqOutputChannel);
+ PushRow(CreateRow(++val, packet), dqOutputChannel);
+ if (watermarkPeriod && packet % watermarkPeriod == 0) {
+ LOG_D("push watermark" << packet);
+ NDqProto::TWatermark watermark;
+ watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds());
+ dqOutputChannel->Push(std::move(watermark));
+ expectedWatermark = std::max(expectedWatermark, TMaybe<TInstant>(TInstant::Seconds(packet)));
+ }
+ if (isFinal || activeChannels.size() > 1 && rng() % std::max(packets/numChannels, ui32{1}) == 0) {
+ // when we have more than one active channels left, we may randomly finish it midway
+ dqOutputChannel->Finish();
+ activeChannels.pop_back();
+ }
+ return std::pair { dqOutputChannel, &seqNo[channelIdx] };
+ },
+ asyncCA, packets, waitIntermediateAcks);
+ // Finish all unfinished channels (when we have more than one channel left, only one is forcibly finished on final packet)
+ for (ui32 channelIdx = 0; channelIdx < numChannels; ++channelIdx) {
+ auto dqOutputChannel = dqOutputChannels[channelIdx];
+ if (dqOutputChannel->IsFinished()) {
+ continue;
+ }
+ SendFinish(asyncCA, dqOutputChannel, &seqNo[channelIdx]);
+ }
+
+ TMap<ui32, ui32> receivedData;
+ TMaybe<TInstant> watermark;
+ try {
+ while (ReceiveData(
+ [this, &receivedData, &watermark](const NUdf::TUnboxedValue& val, ui32 column) {
+ UNIT_ASSERT(!!val);
+ UNIT_ASSERT(val.IsEmbedded());
+ if (RowType->GetMemberName(column) == "ts") {
+ auto ts = val.Get<ui64>();
+ if (watermark) {
+ UNIT_ASSERT_GT_C(ts, watermark->Seconds(), ts << " >= " << watermark->Seconds());
+ }
+ return true;
+ }
+ UNIT_ASSERT_EQUAL(RowType->GetMemberName(column), "id");
+ auto data = val.Get<i32>();
+ LOG_D(data);
+ ++receivedData[data];
+ return true;
+ },
+ [this, &watermark](const auto& receivedWatermark) {
+ watermark = receivedWatermark;
+ LOG_D("Got watermark " << *watermark);
+ },
+ [this, &asyncCA]() {
+ DumpMonPage(asyncCA, [this](auto&& str) {
+ UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>");
+ UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix);
+ // TODO add validation
+ LOG_D(str);
+ });
+ },
+ dqInputChannel))
+ {}
+ } catch(...) {
+ DumpMonPage(asyncCA, [this](auto&& str) {
+ LOG_D(str);
+ });
+ throw;
+ }
+ UNIT_ASSERT_EQUAL_C(receivedData.size(), val, "expected size " << val << " != " << receivedData.size());
+ for (; val > 0; --val) {
+ UNIT_ASSERT_EQUAL_C(receivedData[val * val], 1, "expected count for " << (val * val));
+ }
+ if (expectedWatermark) {
+ WEAK_UNIT_ASSERT(!!watermark);
+ if (watermark) {
+ UNIT_ASSERT_LE_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " <= " << expectedWatermark);
+ WEAK_UNIT_ASSERT_EQUAL_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " == " << expectedWatermark << ", Watermark Delay is " << (*expectedWatermark - *watermark));
+ LOG_D("Last watermark " << *watermark);
+ } else {
+ LOG_E("NO WATERMARK");
+ }
+ } else {
+ UNIT_ASSERT(!watermark);
+ }
+ }
+
+ void InputTransformMultichannelTests(ui32 packets, ui32 watermarkPeriod, bool waitIntermediateAcks, ui32 numChannels, auto& rng) {
LogPrefix = TStringBuilder() << "InputTransform Test for:"
<< " packets=" << packets
- << " doWatermark=" << doWatermark
+ << " watermarkPeriod=" << watermarkPeriod
<< " waitIntermediateAcks=" << waitIntermediateAcks
+ << " channels=" << numChannels
<< " ";
NDqProto::TDqTask task;
GenerateEmptyProgram(task, [](auto& ctx) {
@@ -728,7 +853,7 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
return structType;
});
TMap<i32, ui32> expectedData;
- auto dqOutputChannel = AddDummyInputChannel(task, InputChannelId);
+ auto dqOutputChannels = AddDummyInputChannels(task, InputChannelId, numChannels);
auto dqInputChannel = AddDummyOutputChannel(task, OutputChannelId, (IsWide ? static_cast<TType*>(WideRowTransformedType) : RowTransformedType));
SetInputTransform(task,
TDataType::Create(NUdf::TDataType<i32>::Id, TypeEnv),
@@ -737,12 +862,19 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
auto asyncCA = CreateTestAsyncCA(task);
ActorSystem.EnableScheduleForActor(asyncCA, true);
- ui32 seqNo = 0;
+ ActorSystem.GrabEdgeEvent<TEvDqCompute::TEvState>(EdgeActor);
+
ui32 val = 0;
- for (ui32 packet = 1; packet <= packets; ++packet) {
- bool isFinal = packet == packets;
- bool noAck = (packet % 2) == 0; // set noAck on even packets
+ TMaybe<TInstant> expectedWatermark;
+ TVector<ui32> seqNo(numChannels);
+ TVector<ui64> activeChannels(numChannels);
+ std::iota(activeChannels.begin(), activeChannels.end(), 0);
+ SendData([&](auto packet, bool isFinal) {
+ auto channelIdxIdx = rng() % activeChannels.size();
+ std::swap(activeChannels[channelIdxIdx], activeChannels.back());
+ auto channelIdx = activeChannels.back();
+ auto dqOutputChannel = dqOutputChannels[channelIdx];
PushRow(CreateRow(++val, packet), dqOutputChannel);
++expectedData[val];
PushRow(CreateRow(++val, packet), dqOutputChannel);
@@ -756,36 +888,27 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
++expectedData[val];
PushRow(CreateRow(++val, packet), dqOutputChannel);
++expectedData[val];
- if (doWatermark) {
+ if (watermarkPeriod && packet % watermarkPeriod == 0) {
NDqProto::TWatermark watermark;
watermark.SetTimestampUs(TInstant::Seconds(packet).MicroSeconds());
dqOutputChannel->Push(std::move(watermark));
+ expectedWatermark = std::max(expectedWatermark, TMaybe<TInstant>(TInstant::Seconds(packet)));
}
- if (isFinal) {
+ if (isFinal || activeChannels.size() > 1 && rng() % std::max(packets/numChannels, ui32{1}) == 0) {
+ // when we have more than one active channels left, we may randomly finish it midway
dqOutputChannel->Finish();
+ activeChannels.pop_back();
}
-
- auto evInputChannelData = MakeHolder<TEvDqCompute::TEvChannelData>();
- evInputChannelData->Record.SetSeqNo(++seqNo);
- evInputChannelData->Record.SetNoAck(noAck);
- auto& chData = *evInputChannelData->Record.MutableChannelData();
- if (TDqSerializedBatch serializedBatch; dqOutputChannel->Pop(serializedBatch)) {
- *chData.MutableData() = serializedBatch.Proto;
- Y_ENSURE(serializedBatch.Payload.Empty()); // TODO
- }
- if (NDqProto::TWatermark watermark; dqOutputChannel->Pop(watermark)) {
- *chData.MutableWatermark() = watermark;
- }
- if (NDqProto::TCheckpoint checkpoint; dqOutputChannel->Pop(checkpoint)) {
- *chData.MutableCheckpoint() = checkpoint;
- }
- chData.SetChannelId(InputChannelId);
- chData.SetFinished(dqOutputChannel->IsFinished());
- LOG_D("Sending " << packet << "/" << packets << " " << chData);
- ActorSystem.Send(asyncCA, SrcEdgeActor, evInputChannelData.Release());
- if ((isFinal || waitIntermediateAcks) && !noAck) {
- WaitForChannelDataAck(InputChannelId, seqNo);
+ return std::pair { dqOutputChannel, &seqNo[channelIdx] };
+ },
+ asyncCA, packets, waitIntermediateAcks);
+ // Finish all unfinished channels (when we have more than one channel left, only one is forcibly finished on final packet)
+ for (ui32 channelIdx = 0; channelIdx < numChannels; ++channelIdx) {
+ auto dqOutputChannel = dqOutputChannels[channelIdx];
+ if (dqOutputChannel->IsFinished()) {
+ continue;
}
+ SendFinish(asyncCA, dqOutputChannel, &seqNo[channelIdx]);
}
TMap<i32, ui32> receivedData;
@@ -799,16 +922,16 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
if (columnName == "e.id") {
UNIT_ASSERT(!!val);
UNIT_ASSERT(val.IsEmbedded());
- LOG_D(column << "id = " << val.Get<i32>());
+ LOG_D(column << " id = " << val.Get<i32>());
col0 = val.Get<i32>();
++receivedData[val.Get<i32>()];
} else if (columnName == "e.ts") {
UNIT_ASSERT(!!val);
UNIT_ASSERT(val.IsEmbedded());
auto ts = val.Get<ui64>();
- LOG_D(column << "ts = " << ts);
+ LOG_D(column << " ts = " << ts);
if (watermark) {
- WEAK_UNIT_ASSERT_GT_C(ts, watermark->Seconds(), "Timestamp " << ts << " before watermark: " << watermark->Seconds());
+ UNIT_ASSERT_GT_C(ts, watermark->Seconds(), "Timestamp " << ts << " before watermark: " << watermark->Seconds());
}
} else if (columnName == "u.key") {
if (col0 >= MinTransformedValue && col0 <= MaxTransformedValue) {
@@ -817,10 +940,11 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
UNIT_ASSERT(!!cval);
UNIT_ASSERT(cval.IsEmbedded());
auto data = cval.Get<i32>();
- LOG_D(column << "key = " << data);
+ LOG_D(column << " key = " << data);
UNIT_ASSERT_EQUAL_C(data, col0, data << "!=" << col0);
} else {
UNIT_ASSERT_C(!val, "null (1) expected for " << col0);
+ LOG_D(column << " key IS NULL");
}
} else if (columnName == "u.data") {
if (col0 >= MinTransformedValue && col0 <= MaxTransformedValue) {
@@ -828,10 +952,11 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
const auto cval = val.GetOptionalValue();
UNIT_ASSERT(!!cval);
auto ref = TString(cval.AsStringRef());
- LOG_D(column << "data = '" << ref << "'");
+ LOG_D(column << " data = '" << ref << "'");
UNIT_ASSERT_EQUAL(ref, ToString(col0));
} else {
UNIT_ASSERT_C(!val, "null (2) expected for " << col0);
+ LOG_D(column << " data IS NULL");
}
} else {
UNIT_ASSERT_C(false, "Unexpected column " << column << " name " << columnName);
@@ -842,27 +967,48 @@ struct TAsyncCATestFixture: public NUnitTest::TBaseFixture {
watermark = receivedWatermark;
LOG_D("Got watermark " << *watermark);
},
+ [this, &asyncCA]() {
+ DumpMonPage(asyncCA, [this](auto&& str) {
+ UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>");
+ UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix);
+ // TODO add validation
+ LOG_D(str);
+ });
+ },
dqInputChannel))
{}
- DumpMonPage(asyncCA, [this](auto&& str) {
- UNIT_ASSERT_STRING_CONTAINS(str, "<h3>Sources</h3>");
- UNIT_ASSERT_STRING_CONTAINS(str, LogPrefix);
- // TODO add validation
- LOG_D(str);
- });
- UNIT_ASSERT_EQUAL(receivedData.size(), expectedData.size());
+ UNIT_ASSERT_EQUAL_C(receivedData.size(), expectedData.size(), "received " << receivedData.size() << " != expected " << expectedData.size());
for (auto [receivedVal, receivedCnt] : receivedData) {
UNIT_ASSERT_EQUAL_C(receivedCnt, expectedData[receivedVal], "expected count for " << receivedVal << ": " << receivedCnt << " != " << expectedData[receivedVal]);
}
- if (doWatermark) {
+ if (expectedWatermark) {
WEAK_UNIT_ASSERT(!!watermark);
if (watermark) {
+ UNIT_ASSERT_LE_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " <= " << expectedWatermark);
+ WEAK_UNIT_ASSERT_EQUAL_C(*watermark, expectedWatermark, "Expected " << (*watermark) << " == " << expectedWatermark << ", Watermark Delay is " << (*expectedWatermark - *watermark));
LOG_D("Last watermark " << *watermark);
} else {
LOG_E("NO WATERMARK");
}
+ } else {
+ UNIT_ASSERT(!watermark);
}
}
+
+ auto GetRandomSeed() {
+ uint32_t seed = 0; // by default tests are reproducible (fixed-seed PRNG)
+ if (auto env = getenv("RANDOM_SEED")) {
+ if (*env) {
+ // with non-empty $RANDOM_SEED use it as seed (to reproduce random test failures)
+ seed = ::FromString<uint32_t>(env);
+ } else {
+ // with empty $RANDOM_SEED make tests truly random
+ seed = (std::random_device {})();
+ Cerr << "RANDOM_SEED=" << seed << Endl;
+ }
+ }
+ return seed;
+ }
};
} //namespace anonymous
@@ -871,20 +1017,44 @@ Y_UNIT_TEST_SUITE(TAsyncComputeActorTest) {
Y_UNIT_TEST_F(Empty, TAsyncCATestFixture) { }
Y_UNIT_TEST_F(Basic, TAsyncCATestFixture) {
+ TVector<ui32> sizes{ 1, 2, 3, 4, 5, 51, 128, 251 };
+ auto seed = GetRandomSeed();
+ std::mt19937 rng(seed);
+ for (ui32 t = 0; t < 16; ++t) sizes.push_back(1 + rng() % 734);
for (bool waitIntermediateAcks : { false, true }) {
- for (bool doWatermark : { false, true }) {
- for (ui32 packets : { 1, 2, 3, 4, 5 }) {
- BasicTests(packets, doWatermark, waitIntermediateAcks);
+ for (ui32 watermarkPeriod : { 0, 1, 3 }) {
+ for (ui32 packets : sizes) {
+ for (ui32 numChannels : { 1, 3, 7, 16 }) {
+ std::mt19937 trng(seed);
+ BasicMultichannelTests(packets, watermarkPeriod, waitIntermediateAcks, numChannels, trng);
+ }
}
}
}
}
- Y_UNIT_TEST_F(InputTransform, TAsyncCATestFixture) {
- for (bool waitIntermediateAcks : { false, true }) {
- for (bool doWatermark : { false, true }) {
- for (ui32 packets : { 1, 2, 3, 4, 5, 111 }) {
- InputTransformTests(packets, doWatermark, waitIntermediateAcks);
+ Y_UNIT_TEST_F(StatsMode, TAsyncCATestFixture) {
+ for (auto statsMode : {
+ NDqProto::DQ_STATS_MODE_NONE,
+ NDqProto::DQ_STATS_MODE_BASIC,
+ NDqProto::DQ_STATS_MODE_FULL,
+ NDqProto::DQ_STATS_MODE_PROFILE,
+ }) {
+ std::mt19937 rng;
+ BasicMultichannelTests(5, 1, true, 1, rng, statsMode);
+ }
+ }
+
+ Y_UNIT_TEST_F(InputTransformMultichannel, TAsyncCATestFixture) {
+ TVector<ui32> sizes{ 1, 2, 3, 4, 5, 51, 128, 251 };
+ std::mt19937 rng(GetRandomSeed());
+ for (ui32 t = 0; t < 16; ++t) sizes.push_back(1 + rng() % 734);
+ for (ui32 numChannels: { 1, 2, 7, 11 }) {
+ for (bool waitIntermediateAcks : { false, true }) {
+ for (ui32 watermarkPeriod : { 0, 1, 3 }) {
+ for (ui32 packets : sizes) {
+ InputTransformMultichannelTests(packets, watermarkPeriod, waitIntermediateAcks, numChannels, rng);
+ }
}
}
}
diff --git a/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp b/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp
index a7c56e0ad3d..5bf48f39be1 100644
--- a/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp
+++ b/ydb/library/yql/dq/actors/compute/ut/mock_lookup_factory.cpp
@@ -139,7 +139,7 @@ private:
if (PayloadType->GetMemberType(0)->IsOptional()) {
valueItems[0] = valueItems[0].MakeOptional();
}
- } else if (val == 999) { // simulate error
+ } else if (val == 9999) { // simulate error
SendError();
return;
}
diff --git a/ydb/library/yql/dq/actors/compute/ut/ya.make b/ydb/library/yql/dq/actors/compute/ut/ya.make
index 2bb32ad97ed..02bbde55132 100644
--- a/ydb/library/yql/dq/actors/compute/ut/ya.make
+++ b/ydb/library/yql/dq/actors/compute/ut/ya.make
@@ -41,6 +41,8 @@ PEERDIR(
YQL_LAST_ABI_VERSION()
+SIZE(MEDIUM)
+
END()
RECURSE(
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index 769a44b7ab5..e61ef571cd3 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -119,17 +119,19 @@ struct TEvOutputChannelDataRequest
struct TEvInputChannelData
: NActors::TEventLocal<TEvInputChannelData, TTaskRunnerEvents::EvInputChannelData>
{
- TEvInputChannelData(ui32 channelId, std::optional<TDqSerializedBatch>&& data, bool finish, bool pauseAfterPush)
+ TEvInputChannelData(ui32 channelId, std::optional<TDqSerializedBatch>&& data, bool finish, bool pauseAfterPush, const TMaybe<TInstant>& watermarkAfterPush = {})
: ChannelId(channelId)
, Data(std::move(data))
, Finish(finish)
, PauseAfterPush(pauseAfterPush)
+ , WatermarkAfterPush(watermarkAfterPush)
{ }
const ui32 ChannelId;
std::optional<TDqSerializedBatch> Data; //not const, because we want to efficiently move data out of this event on a reciever side
const bool Finish;
const bool PauseAfterPush;
+ const TMaybe<TInstant> WatermarkAfterPush;
};
//Sent by TaskRunnerActor to ComputeActor to ackonowledge input data received in TEvInputChannelData
@@ -223,7 +225,9 @@ struct TEvTaskRunFinished
const TDqMemoryQuota::TProfileStats& profileStats = {},
ui64 mkqlMemoryLimit = 0,
THolder<TMiniKqlProgramState>&& programState = nullptr,
- bool watermarkInjectedToOutputs = false,
+ TMaybe<TInstant> watermarkInjectedToOutputs = Nothing(),
+ TVector<ui32>&& finishedInputsWithWatermarks = {},
+ TVector<ui32>&& finishedSourcesWithWatermarks = {},
bool checkpointRequestedFromTaskRunner = false,
TDuration computeTime = TDuration::Zero())
: RunStatus(runStatus)
@@ -234,6 +238,8 @@ struct TEvTaskRunFinished
, MkqlMemoryLimit(mkqlMemoryLimit)
, ProgramState(std::move(programState))
, WatermarkInjectedToOutputs(watermarkInjectedToOutputs)
+ , FinishedInputsWithWatermarks(finishedInputsWithWatermarks)
+ , FinishedSourcesWithWatermarks(finishedSourcesWithWatermarks)
, CheckpointRequestedFromTaskRunner(checkpointRequestedFromTaskRunner)
, ComputeTime(computeTime)
{ }
@@ -246,7 +252,9 @@ struct TEvTaskRunFinished
TDqMemoryQuota::TProfileStats ProfileStats;
ui64 MkqlMemoryLimit = 0;
THolder<TMiniKqlProgramState> ProgramState;
- bool WatermarkInjectedToOutputs = false;
+ TMaybe<TInstant> WatermarkInjectedToOutputs = Nothing();
+ TVector<ui32> FinishedInputsWithWatermarks;
+ TVector<ui32> FinishedSourcesWithWatermarks;
bool CheckpointRequestedFromTaskRunner = false;
TDuration ComputeTime;
};
@@ -296,16 +304,6 @@ struct TEvOutputChannelData
bool Changed;
};
-struct TWatermarkRequest {
- TWatermarkRequest() = default;
-
- explicit TWatermarkRequest(TInstant watermark)
- : Watermark(watermark) {
- }
-
- TInstant Watermark;
-};
-
// Holds info required to inject barriers to outputs
struct TCheckpointRequest {
explicit TCheckpointRequest(const NDqProto::TCheckpoint& checkpoint)
@@ -321,7 +319,7 @@ struct TEvContinueRun
TEvContinueRun() = default;
explicit TEvContinueRun(
- TMaybe<TWatermarkRequest>&& watermarkRequest,
+ TMaybe<TInstant>&& watermarkRequest,
TMaybe<TCheckpointRequest>&& checkpointRequest,
bool checkpointOnly
)
@@ -340,7 +338,7 @@ struct TEvContinueRun
bool AskFreeSpace = true;
const TVector<ui32> InputChannels;
ui64 MemLimit;
- TMaybe<TWatermarkRequest> WatermarkRequest = Nothing();
+ TMaybe<TInstant> WatermarkRequest = Nothing();
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
};
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index db196a295ab..c2e32e718a2 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -131,16 +131,55 @@ private:
}
bool ReadyToCheckpoint() {
+ for (const auto sourceId: Sources) {
+ const auto input = TaskRunner->GetSource(sourceId);
+ // sources are not polled upon checkpoint
+ if (!input->Empty()) { // check if buffer is empty
+ return false;
+ }
+ }
for (const auto inputChannelId: InputsWithCheckpoints) {
const auto input = TaskRunner->GetInputChannel(inputChannelId);
- if (!input->IsPaused()) {
+ if (!input->IsPausedByCheckpoint()) {
+ return false;
+ }
+ if (!input->Empty()) {
+ return false;
+ }
+ }
+ for (const auto transformId: InputTransformsWithCheckpoints) {
+ const auto t = TaskRunner->GetInputTransform(transformId);
+ if (t) {
+ auto [_, transform] = *t;
+ if (!transform->Empty()) {
+ return false;
+ }
+ if (transform->IsPending()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ bool ReadyToWatermark() {
+ for (const auto sourceId: SourcesWithWatermarks) {
+ const auto input = TaskRunner->GetSource(sourceId);
+ // sources are not polled upon watermark
+ if (!input->Empty()) { // check if buffer is empty
+ return false;
+ }
+ }
+ for (const auto inputChannelId: InputsWithWatermarks) {
+ const auto input = TaskRunner->GetInputChannel(inputChannelId);
+ if (!input->IsPausedByWatermark()) {
return false;
}
if (!input->Empty()) {
return false;
}
}
- for (const auto transformId: InputTransforms) {
+ for (const auto transformId: InputTransformsWithWatermarks) {
const auto t = TaskRunner->GetInputTransform(transformId);
if (t) {
auto [_, transform] = *t;
@@ -166,14 +205,26 @@ private:
THashMap<ui32, i64> inputChannelFreeSpace;
THashMap<ui32, i64> sourcesFreeSpace;
- const bool shouldHandleWatermark = ev->Get()->WatermarkRequest.Defined()
- && ev->Get()->WatermarkRequest->Watermark > TaskRunner->GetWatermark().WatermarkIn;
+ const auto& nextWatermark = ev->Get()->WatermarkRequest;
+ if (LastWatermark < nextWatermark) {
+ LastWatermark = *nextWatermark;
+ if (WatermarkRequests.empty()) {
+ if (HasActiveCheckpoint) {
+ LOG_T("Watermark delayed by checkpoint");
+ } else {
+ PauseInputs(*nextWatermark);
+ }
+ }
+ WatermarkRequests.push_back(*nextWatermark);
+ }
if (!ev->Get()->CheckpointOnly) {
- if (shouldHandleWatermark) {
- const auto watermark = ev->Get()->WatermarkRequest->Watermark;
- LOG_T("Task runner. Inject watermark " << watermark);
- TaskRunner->SetWatermarkIn(watermark);
+ if (!WatermarkRequests.empty()) {
+ auto watermarkRequest = WatermarkRequests.front();
+ if (TaskRunner->GetWatermark().WatermarkIn < watermarkRequest && ReadyToWatermark()) {
+ LOG_T("Task runner. Inject watermark " << watermarkRequest);
+ TaskRunner->SetWatermarkIn(watermarkRequest);
+ }
}
res = TaskRunner->Run();
@@ -187,21 +238,31 @@ private:
sourcesFreeSpace[index] = TaskRunner->GetSource(index)->GetFreeSpace();
}
- auto watermarkInjectedToOutputs = false;
+ TMaybe<TInstant> watermarkInjectedToOutputs;
THolder<TMiniKqlProgramState> mkqlProgramState;
if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) {
- if (shouldHandleWatermark) {
- const auto watermarkRequested = ev->Get()->WatermarkRequest->Watermark;
- LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequested
+ if (!WatermarkRequests.empty() && WatermarkRequests.front() == TaskRunner->GetWatermark().WatermarkIn) {
+ auto watermarkRequest = WatermarkRequests.front();
+ WatermarkRequests.pop_front();
+ LOG_T("Task runner. Watermarks. Injecting requested watermark " << watermarkRequest
<< " to " << OutputsWithWatermarks.size() << " outputs ");
for (const auto& channelId : OutputsWithWatermarks) {
NDqProto::TWatermark watermark;
- watermark.SetTimestampUs(watermarkRequested.MicroSeconds());
+ watermark.SetTimestampUs(watermarkRequest.MicroSeconds());
TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark));
}
-
- watermarkInjectedToOutputs = true;
+ ResumeByWatermark(watermarkRequest);
+ watermarkInjectedToOutputs = watermarkRequest;
+ if (!WatermarkRequests.empty()) {
+ if (HasActiveCheckpoint) {
+ LOG_T("Next watermark delayed by active checkpoint");
+ } else {
+ auto nextWatermarkRequest = WatermarkRequests.front();
+ LOG_T("Task runner. Re-pause on watermark " << nextWatermarkRequest);
+ PauseInputs(nextWatermarkRequest);
+ }
+ }
}
if (ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) {
@@ -223,6 +284,40 @@ private:
LOG_E("Failed to save state: " << e.what());
mkqlProgramState = nullptr;
}
+ HasActiveCheckpoint = false;
+ if (!WatermarkRequests.empty()) {
+ auto nextWatermarkRequest = WatermarkRequests.front();
+ LOG_T("Task runner. Pause by watermark " << nextWatermarkRequest);
+ PauseInputs(nextWatermarkRequest);
+ }
+ }
+ }
+
+ TVector<ui32> finishedInputsWithWatermarks;
+ TVector<ui32> finishedSourcesWithWatermarks;
+ if (WatermarkRequests.empty()) {
+ // check if any of inputs become empty and finished and drop them off
+ for (ui32 i = InputsWithWatermarksPendingFinish; i < InputsWithWatermarks.size(); ) {
+ auto& channelId = InputsWithWatermarks[i];
+ auto inputChannel = TaskRunner->GetInputChannel(channelId);
+ if (inputChannel->IsFinished()) {
+ finishedInputsWithWatermarks.push_back(channelId);
+ std::swap(channelId, InputsWithWatermarks.back());
+ InputsWithWatermarks.pop_back();
+ } else {
+ ++i;
+ }
+ }
+ for (ui32 i = SourcesWithWatermarksPendingFinish; i < SourcesWithWatermarks.size(); ) {
+ auto& sourceId = SourcesWithWatermarks[i];
+ auto source = TaskRunner->GetSource(sourceId);
+ if (source->IsFinished()) {
+ finishedSourcesWithWatermarks.push_back(sourceId);
+ std::swap(sourceId, SourcesWithWatermarks.back());
+ SourcesWithWatermarks.pop_back();
+ } else {
+ ++i;
+ }
}
}
@@ -258,6 +353,8 @@ private:
MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0,
std::move(mkqlProgramState),
watermarkInjectedToOutputs,
+ std::move(finishedInputsWithWatermarks),
+ std::move(finishedSourcesWithWatermarks),
ev->Get()->CheckpointRequest.Defined(),
TInstant::Now() - start),
/*flags=*/0,
@@ -275,9 +372,26 @@ private:
const ui64 freeSpace = inputChannel->GetFreeSpace();
if (finish) {
inputChannel->Finish();
+
+ // check if finished channel was tracked for watermarks move them to Pending part
+ Y_DEBUG_ABORT_UNLESS(InputsWithWatermarksPendingFinish <= InputsWithWatermarks.size());
+ const auto end = InputsWithWatermarks.begin() + InputsWithWatermarksPendingFinish;
+ auto it = std::find(InputsWithWatermarks.begin(), end, channelId); // O(n), but rare/once-per-channel
+ if (it != end) {
+ Y_DEBUG_ABORT_UNLESS(InputsWithWatermarksPendingFinish > 0);
+ --InputsWithWatermarksPendingFinish;
+ std::swap(*it, InputsWithWatermarks[InputsWithWatermarksPendingFinish]);
+ }
}
if (ev->Get()->PauseAfterPush) {
- inputChannel->Pause();
+ HasActiveCheckpoint = true;
+ inputChannel->PauseByCheckpoint();
+ }
+ if (ev->Get()->WatermarkAfterPush) {
+ LOG_T("Adding " << *ev->Get()->WatermarkAfterPush);
+ inputChannel->AddWatermark(*ev->Get()->WatermarkAfterPush);
+ } else {
+ LOG_T("No watermark ");
}
// run
@@ -299,6 +413,15 @@ private:
source->Push(std::move(batch), space);
if (finish) {
source->Finish();
+
+ Y_DEBUG_ABORT_UNLESS(SourcesWithWatermarksPendingFinish <= SourcesWithWatermarks.size());
+ const auto end = SourcesWithWatermarks.begin() + SourcesWithWatermarksPendingFinish;
+ auto it = std::find(SourcesWithWatermarks.begin(), end, index); // O(n), but rare/once-per-channel
+ if (it != end) {
+ Y_DEBUG_ABORT_UNLESS(SourcesWithWatermarksPendingFinish > 0);
+ --SourcesWithWatermarksPendingFinish;
+ std::swap(*it, SourcesWithWatermarks[SourcesWithWatermarksPendingFinish]);
+ }
}
Send(
ParentId,
@@ -357,7 +480,7 @@ private:
checkpoint = hasCheckpoint ? std::move(poppedCheckpoint) : TMaybe<NDqProto::TCheckpoint>();
if (hasCheckpoint) {
- ResumeInputs();
+ ResumeByCheckpoint();
break;
}
@@ -381,9 +504,21 @@ private:
ev->Cookie);
}
- void ResumeInputs() {
- for (const auto& inputId : Inputs) {
- TaskRunner->GetInputChannel(inputId)->Resume();
+ void ResumeByCheckpoint() {
+ for (const auto& inputId : InputsWithCheckpoints) {
+ TaskRunner->GetInputChannel(inputId)->ResumeByCheckpoint();
+ }
+ }
+
+ void ResumeByWatermark(TInstant watermark) {
+ for (const auto& inputId : InputsWithWatermarks) {
+ TaskRunner->GetInputChannel(inputId)->ResumeByWatermark(watermark);
+ }
+ }
+
+ void PauseInputs(TInstant watermark) {
+ for (const auto& inputId : InputsWithWatermarks) {
+ TaskRunner->GetInputChannel(inputId)->PauseByWatermark(watermark);
}
}
@@ -404,7 +539,7 @@ private:
if (hasCheckpoint) {
checkpointSize = checkpoint.ByteSize();
maybeCheckpoint.ConstructInPlace(std::move(checkpoint));
- ResumeInputs();
+ ResumeByCheckpoint();
}
const bool finished = sink->IsFinished();
const bool changed = finished || size > 0 || hasCheckpoint;
@@ -422,19 +557,43 @@ private:
auto& inputs = settings.GetInputs();
for (auto inputId = 0; inputId < inputs.size(); inputId++) {
auto& input = inputs[inputId];
+ bool inputWatermarksDisabled = false;
+ bool inputCheckpointDisabled = false;
if (input.HasSource()) {
Sources.emplace_back(inputId);
+ if (input.GetSource().GetWatermarksMode() == NDqProto::WATERMARKS_MODE_DISABLED) {
+ inputWatermarksDisabled = true;
+ } else {
+ SourcesWithWatermarks.emplace_back(inputId);
+ }
} else {
for (auto& channel : input.GetChannels()) {
Inputs.emplace_back(channel.GetId());
if (channel.GetCheckpointingMode() != NDqProto::CHECKPOINTING_MODE_DISABLED) {
InputsWithCheckpoints.emplace_back(channel.GetId());
+ } else {
+ inputCheckpointDisabled = true;
+ }
+ if (channel.GetWatermarksMode() != NDqProto::WATERMARKS_MODE_DISABLED) {
+ InputsWithWatermarks.emplace_back(channel.GetId());
+ } else {
+ inputWatermarksDisabled = true;
}
}
}
+ if (input.HasTransform()) {
+ if (!inputWatermarksDisabled) {
+ InputTransformsWithWatermarks.emplace_back(inputId);
+ }
+ if (!inputCheckpointDisabled) {
+ InputTransformsWithCheckpoints.emplace_back(inputId);
+ }
+ }
}
std::sort(Inputs.begin(), Inputs.end());
Y_ENSURE(std::unique(Inputs.begin(), Inputs.end()) == Inputs.end());
+ InputsWithWatermarksPendingFinish = InputsWithWatermarks.size();
+ SourcesWithWatermarksPendingFinish = SourcesWithWatermarks.size();
auto& outputs = settings.GetOutputs();
for (auto outputId = 0; outputId < outputs.size(); outputId++) {
@@ -526,14 +685,23 @@ private:
const ui64 TaskId;
TVector<ui32> Inputs;
TVector<ui32> InputsWithCheckpoints;
+ TVector<ui32> InputsWithWatermarks;
+ ui32 InputsWithWatermarksPendingFinish; // index in InputsWithWatermarks after which source buffers has pending finished mark
TVector<ui32> InputTransforms;
+ TVector<ui32> InputTransformsWithCheckpoints;
+ TVector<ui32> InputTransformsWithWatermarks;
TVector<ui32> Sources;
+ TVector<ui32> SourcesWithWatermarks;
+ ui32 SourcesWithWatermarksPendingFinish; // index in SourcesWithWatermarks after which source buffers has pending finished mark
TVector<ui32> Sinks;
TVector<ui32> Outputs;
TVector<ui32> OutputsWithWatermarks;
TIntrusivePtr<NDq::IDqTaskRunner> TaskRunner;
THolder<TDqMemoryQuota> MemoryQuota;
ui64 ActorElapsedTicks = 0;
+ TMaybe<TInstant> LastWatermark;
+ std::deque<TInstant> WatermarkRequests;
+ bool HasActiveCheckpoint = false;
};
struct TLocalTaskRunnerActorFactory: public ITaskRunnerActorFactory {
diff --git a/ydb/library/yql/dq/runtime/dq_async_input.cpp b/ydb/library/yql/dq/runtime/dq_async_input.cpp
index 7f6265660b5..5c4701a9a32 100644
--- a/ydb/library/yql/dq/runtime/dq_async_input.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_input.cpp
@@ -58,7 +58,7 @@ public:
}
bool IsPending() const override {
- return Pending;
+ return Pending && !IsFinished();
}
};
diff --git a/ydb/library/yql/dq/runtime/dq_input.h b/ydb/library/yql/dq/runtime/dq_input.h
index 945430362be..db2a4bf6292 100644
--- a/ydb/library/yql/dq/runtime/dq_input.h
+++ b/ydb/library/yql/dq/runtime/dq_input.h
@@ -43,9 +43,24 @@ public:
// After pause IDqInput::Pop() stops return batches that were pushed before pause
// and returns Empty() after all the data before pausing was read.
// Compute Actor can push data after pause, but program won't receive it until Resume() is called.
- virtual void Pause() = 0;
- virtual void Resume() = 0;
- virtual bool IsPaused() const = 0;
+ virtual void PauseByCheckpoint() = 0;
+ virtual void ResumeByCheckpoint() = 0;
+ virtual bool IsPausedByCheckpoint() const = 0;
+ // Watermarks
+ // Called after receiving watermark (watermark position remembered,
+ // but does not pause channel); watermark may be pushed behind new data
+ // by reading or checkpoint
+ virtual void AddWatermark(TInstant watermark) = 0;
+ // Called after watermark is ready for TaskRunner (got watermarks on all channels;
+ // implies channel must already contain greater-or-equal watermark);
+ // Same as with PauseByCheckpoint, any data added adter Pause is not received until Resume;
+ // If called after PauseByCheckpoint(), checkpoint takes priority
+ virtual void PauseByWatermark(TInstant watermark) = 0;
+ // Called after watermark processed by TaskRunner;
+ // If called before PauseByCheckpoint, all watermarks greater than this
+ // moved after checkpoint (with no data between checkpoint and watermark)
+ virtual void ResumeByWatermark(TInstant watermark) = 0;
+ virtual bool IsPausedByWatermark() const = 0;
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.cpp b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
index f0098f0fc0f..cbc224daed6 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.cpp
@@ -107,9 +107,19 @@ public:
return (DataForDeserialize.empty() || Impl.IsPaused()) && Impl.Empty();
}
- void Pause() override {
+ void PauseByCheckpoint() override {
DeserializeAllData();
- Impl.Pause();
+ Impl.PauseByCheckpoint();
+ }
+
+ void AddWatermark(TInstant watermark) override {
+ DeserializeAllData();
+ Impl.AddWatermark(watermark);
+ }
+
+ void PauseByWatermark(TInstant watermark) override {
+ DeserializeAllData();
+ Impl.PauseByWatermark(watermark);
}
bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch) override {
@@ -147,12 +157,20 @@ public:
return Impl.GetInputType();
}
- void Resume() override {
- Impl.Resume();
+ void ResumeByCheckpoint() override {
+ Impl.ResumeByCheckpoint();
+ }
+
+ bool IsPausedByCheckpoint() const override {
+ return Impl.IsPausedByCheckpoint();
+ }
+
+ void ResumeByWatermark(TInstant watermark) override {
+ Impl.ResumeByWatermark(watermark);
}
- bool IsPaused() const override {
- return Impl.IsPaused();
+ bool IsPausedByWatermark() const override {
+ return Impl.IsPausedByWatermark();
}
void Finish() override {
diff --git a/ydb/library/yql/dq/runtime/dq_input_impl.h b/ydb/library/yql/dq/runtime/dq_input_impl.h
index 883a927fb4f..37c0b9e31ef 100644
--- a/ydb/library/yql/dq/runtime/dq_input_impl.h
+++ b/ydb/library/yql/dq/runtime/dq_input_impl.h
@@ -36,11 +36,6 @@ public:
return StoredBytes;
}
- [[nodiscard]]
- bool Empty() const override {
- return Batches.empty() || (IsPaused() && GetBatchesBeforePause() == 0);
- }
-
bool IsLegacySimpleBlock(NKikimr::NMiniKQL::TStructType* structType, ui32& blockLengthIndex) {
auto index = structType->FindMemberIndex(BlockLengthColumnName);
if (index) {
@@ -169,6 +164,7 @@ public:
}
Batches.emplace_back(std::move(batch));
+ AddBatchCounts(space, rows);
return rows;
}
@@ -186,12 +182,12 @@ public:
auto& popStats = static_cast<TDerived*>(this)->PopStats;
popStats.Resume(); //save timing before processing
- ui64 popBytes = 0;
- if (IsPaused()) {
- ui64 batchesCount = GetBatchesBeforePause();
- Y_ABORT_UNLESS(batchesCount > 0);
- Y_ABORT_UNLESS(batchesCount <= Batches.size());
+ auto [popBytes, popRows, batchesCount] = PopReadyCounts();
+ Y_ABORT_UNLESS(batchesCount > 0);
+ Y_ABORT_UNLESS(batchesCount <= Batches.size());
+
+ if (batchesCount != Batches.size()) {
if (batch.IsWide()) {
while (batchesCount--) {
@@ -211,15 +207,8 @@ public:
}
}
- popBytes = StoredBytesBeforePause;
-
- BatchesBeforePause = PauseMask;
- Y_ABORT_UNLESS(GetBatchesBeforePause() == 0);
- StoredBytes -= StoredBytesBeforePause;
- StoredRows -= StoredRowsBeforePause;
- StoredBytesBeforePause = 0;
- StoredRowsBeforePause = 0;
} else {
+
if (batch.IsWide()) {
for (auto&& part : Batches) {
part.ForEachRowWide([&batch](NUdf::TUnboxedValue* values, ui32 width) {
@@ -234,16 +223,15 @@ public:
}
}
- popBytes = StoredBytes;
-
- StoredBytes = 0;
- StoredRows = 0;
Batches.clear();
}
+ StoredBytes -= popBytes;
+ StoredRows -= popRows;
+
if (popStats.CollectBasic()) {
popStats.Bytes += popBytes;
- popStats.Rows += GetRowsCount(batch);
+ popStats.Rows += popRows;
popStats.Chunks++;
}
@@ -263,27 +251,166 @@ public:
return InputType;
}
- void Pause() override {
- Y_ABORT_UNLESS(!IsPaused());
- BatchesBeforePause = Batches.size() | PauseMask;
- StoredRowsBeforePause = StoredRows;
- StoredBytesBeforePause = StoredBytes;
+ [[nodiscard]]
+ bool Empty() const override {
+ return Batches.empty() || IsPaused() && BeforeBarrier.Batches == 0;
+ }
+
+private:
+ void AddBatchCounts(ui64 space, ui64 rows) {
+ auto& barrier = PendingBarriers.empty() ? BeforeBarrier : PendingBarriers.back();
+ barrier.Batches ++;
+ barrier.Bytes += space;
+ barrier.Rows += rows;
+ }
+
+ std::tuple<ui64, ui64, ui64> PopReadyCounts() {
+ if (!PendingBarriers.empty() && !IsPaused()) {
+ // There were watermarks, but channel is not paused
+ // Process data anyway and move watermarks behind
+ auto lastBarrier = PendingBarriers.back().Barrier;
+ for (const auto& barrier : PendingBarriers) {
+ Y_ENSURE(!barrier.IsCheckpoint());
+ BeforeBarrier += barrier;
+ }
+ PendingBarriers.clear();
+ PendingBarriers.emplace_back(TBarrier { .Barrier = lastBarrier });
+ }
+ auto popBatches = BeforeBarrier.Batches;
+ auto popBytes = BeforeBarrier.Bytes;
+ auto popRows = BeforeBarrier.Rows;
+
+ BeforeBarrier.Clear();
+ return std::make_tuple(popBytes, popRows, popBatches);
}
- void Resume() override {
- StoredBytesBeforePause = StoredRowsBeforePause = BatchesBeforePause = 0;
- Y_ABORT_UNLESS(!IsPaused());
+public:
+ bool IsPaused() const {
+ return IsPausedByWatermark() || IsPausedByCheckpoint();
}
- bool IsPaused() const override {
- return BatchesBeforePause;
+private:
+ void SkipWatermarksBeforeBarrier() {
+ // Drop watermarks before current barrier
+ while (!PendingBarriers.empty()) {
+ auto& barrier = PendingBarriers.front();
+ if (barrier.Barrier >= PauseBarrier) {
+ break;
+ }
+ BeforeBarrier.Batches += barrier.Batches;
+ BeforeBarrier.Rows += barrier.Rows;
+ BeforeBarrier.Bytes += barrier.Bytes;
+ PendingBarriers.pop_front();
+ }
}
-protected:
- ui64 GetBatchesBeforePause() const {
- return BatchesBeforePause & ~PauseMask;
+public:
+ void PauseByWatermark(TInstant watermark) override {
+ Y_ENSURE(PauseBarrier <= watermark);
+ PauseBarrier = watermark;
+ if (IsPausedByCheckpoint()) {
+ return;
+ }
+ if (IsFinished()) {
+ return;
+ }
+ SkipWatermarksBeforeBarrier();
+ Y_ENSURE(!PendingBarriers.empty());
+ Y_ENSURE(PendingBarriers.front().Barrier >= watermark);
}
+ void PauseByCheckpoint() override {
+ Y_ENSURE(!IsPausedByCheckpoint());
+ if (PauseBarrier != TBarrier::NoBarrier) {
+ Y_ENSURE(!PendingBarriers.empty());
+ if (PendingBarriers.front().Barrier > PauseBarrier) {
+ // (1.BeforeBarrier) (3.Watermark > PauseBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint)
+ // ->
+ // (1.BeforeBarrier) (3.Fake watermark == PauseBarrier with data from 3 & 4 behind) (Checkpoint with empty data behind) (Max watermark from 3 & 4 with empty data behind)
+ auto lastWatermark = PendingBarriers.back().Barrier;
+ TBarrier fakeWatermark(PauseBarrier);
+ for (auto& barrier: PendingBarriers) {
+ fakeWatermark += barrier;
+ }
+ PendingBarriers.clear();
+ PendingBarriers.emplace_back(fakeWatermark);
+ PendingBarriers.emplace_back(); // CheckpointBarrier
+ PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark });
+ } else {
+ Y_ENSURE(PendingBarriers.front().Barrier == PauseBarrier);
+ // (1.BeforeBarrier) (3.Watermark == PauseBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint)
+ // ->
+ // (1.BeforeBarrier) (3.Watermark == PauseBarriers with all data from 3 & 4 behind) (Checkpoint with empty data behind) (Max watermark from 4 with empty data behind)
+ auto lastWatermark = PendingBarriers.size() > 1 ? PendingBarriers.back().Barrier : TBarrier::NoBarrier;
+ for (auto& firstWatermark = PendingBarriers.front(); PendingBarriers.size() > 1; PendingBarriers.pop_back()) {
+ firstWatermark += PendingBarriers.back();
+ }
+ PendingBarriers.emplace_back(); // CheckpointBarrier
+ if (lastWatermark != TBarrier::NoBarrier) {
+ PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark });
+ }
+ }
+ } else if (PendingBarriers.empty()) {
+ PendingBarriers.emplace_front(); // CheckpointBarrier
+ } else {
+ // (1.BeforeBarrier) (4.Some data and watermarks) | (5.Here will be checkpoint)
+ // ->
+ // (1.BeforeBarrier + all data from 4) (5.Checkpoint with empty data behind) (Max watermark from 4 if any with empty data behind)
+ auto lastWatermark = PendingBarriers.back().Barrier;
+ for (auto& barrier: PendingBarriers) { // Move all collected data before checkpoint
+ BeforeBarrier += barrier;
+ }
+ PendingBarriers.clear();
+ PendingBarriers.emplace_back(); // CheckpointBarrier
+ PendingBarriers.emplace_back(TBarrier { .Barrier = lastWatermark });
+ }
+ }
+
+ void AddWatermark(TInstant watermark) override {
+ if (!PendingBarriers.empty() && PendingBarriers.back().Batches == 0 && !PendingBarriers.back().IsCheckpoint()) {
+ Y_ENSURE(PendingBarriers.back().Rows == 0);
+ Y_ENSURE(PendingBarriers.back().Bytes == 0);
+ PendingBarriers.back().Barrier = watermark;
+ } else {
+ PendingBarriers.emplace_back(TBarrier { .Barrier = watermark });
+ }
+ }
+
+ bool IsPausedByWatermark() const override {
+ return !IsPausedByCheckpoint() && PauseBarrier != TBarrier::NoBarrier;
+ }
+
+ bool IsPausedByCheckpoint() const override {
+ return !PendingBarriers.empty() && PendingBarriers.front().IsCheckpoint();
+ }
+
+ void ResumeByWatermark(TInstant watermark) override {
+ Y_ENSURE(Empty());
+ Y_ENSURE(PauseBarrier == watermark);
+ PauseBarrier = TBarrier::NoBarrier;
+ if (IsFinished()) {
+ return;
+ }
+ Y_ENSURE(!PendingBarriers.empty());
+ Y_ENSURE(PendingBarriers.front().Barrier >= watermark);
+ if (PendingBarriers.front().Barrier == watermark) {
+ BeforeBarrier = PendingBarriers.front();
+ PendingBarriers.pop_front();
+ }
+ Y_ENSURE(PendingBarriers.empty() || PendingBarriers.front().Barrier > watermark);
+ }
+
+ void ResumeByCheckpoint() override {
+ Y_ENSURE(IsPausedByCheckpoint());
+ Y_ENSURE(Empty());
+ BeforeBarrier = PendingBarriers.front();
+ PendingBarriers.pop_front();
+ // There can be watermarks before current barrier exposed by checkpoint removal
+ SkipWatermarksBeforeBarrier();
+ }
+
+protected:
+
TMaybe<ui32> GetWidth() const {
return Width;
}
@@ -296,12 +423,35 @@ protected:
ui64 StoredBytes = 0;
ui64 StoredRows = 0;
bool Finished = false;
- ui64 BatchesBeforePause = 0;
- ui64 StoredBytesBeforePause = 0;
- ui64 StoredRowsBeforePause = 0;
- static constexpr ui64 PauseMask = 1llu << 63llu;
TInputChannelFormat Format = FORMAT_UNKNOWN;
ui32 LegacyBlockLengthIndex = 0;
+
+ struct TBarrier {
+ static constexpr TInstant NoBarrier = TInstant::Zero();
+ static constexpr TInstant CheckpointBarrier = TInstant::Max();
+ TInstant Barrier = CheckpointBarrier;
+ ui64 Batches = 0;
+ ui64 Bytes = 0;
+ ui64 Rows = 0;
+ // watermark (!= TInstant::Max()) or checkpoint (TInstant::Max())
+ bool IsCheckpoint() const {
+ return Barrier == CheckpointBarrier;
+ }
+ TBarrier& operator+= (const TBarrier& other) {
+ Batches += other.Batches;
+ Bytes += other.Bytes;
+ Rows += other.Rows;
+ return *this;
+ }
+ void Clear() {
+ Batches = 0;
+ Bytes = 0;
+ Rows = 0;
+ }
+ };
+ std::deque<TBarrier> PendingBarriers; // barrier and counts after barrier
+ TBarrier BeforeBarrier; // counts before barrier
+ TInstant PauseBarrier; // Watermark barrier or TBarrier::NoBarrier
};
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 07058c45a4e..440f94b3b20 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -736,15 +736,31 @@ public:
ythrow yexception() << "unimplemented";
}
- void Pause() override {
+ void PauseByCheckpoint() override {
Y_ABORT("Checkpoints are not supported");
}
- void Resume() override {
+ void ResumeByCheckpoint() override {
Y_ABORT("Checkpoints are not supported");
}
- bool IsPaused() const override {
+ bool IsPausedByCheckpoint() const override {
+ return false;
+ }
+
+ void PauseByWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ void AddWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ void ResumeByWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ bool IsPausedByWatermark() const override {
return false;
}
@@ -892,15 +908,31 @@ public:
return InputType;
}
- void Pause() override {
+ void PauseByCheckpoint() override {
Y_ABORT("Checkpoints are not supported");
}
- void Resume() override {
+ void ResumeByCheckpoint() override {
Y_ABORT("Checkpoints are not supported");
}
- bool IsPaused() const override {
+ bool IsPausedByCheckpoint() const override {
+ return false;
+ }
+
+ void PauseByWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ void AddWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ void ResumeByWatermark(TInstant) override {
+ Y_ABORT("Watermarks are not supported");
+ }
+
+ bool IsPausedByWatermark() const override {
return false;
}