aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@ydb.tech>2022-09-15 19:30:35 +0300
committerd-mokhnatkin <d-mokhnatkin@ydb.tech>2022-09-15 19:30:35 +0300
commit85c93b9ca9dd965a0795dbfd8cb29c4ffc3ec8b9 (patch)
treef27aefdee03a03c7a3721e7a6b8e9047c5b7e350
parente13e0cbfa495fea6d77c2293d096b440feef81dd (diff)
downloadydb-85c93b9ca9dd965a0795dbfd8cb29c4ffc3ec8b9.tar.gz
watermarks implementation
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp2
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp2
-rw-r--r--ydb/library/yql/core/expr_nodes/yql_expr_nodes.json3
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_list.cpp4
-rw-r--r--ydb/library/yql/dq/actors/compute/CMakeLists.txt6
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp111
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h11
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h211
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp93
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h36
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp169
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h52
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h80
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp83
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_events.proto10
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h41
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp72
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto10
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.cpp31
-rw-r--r--ydb/library/yql/dq/runtime/dq_async_output.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_output.h2
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp32
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.h3
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp6
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h6
-rw-r--r--ydb/library/yql/dq/tasks/dq_tasks_graph.h3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp31
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h8
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp3
-rw-r--r--ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp11
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp4
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h3
-rw-r--r--ydb/library/yql/minikql/mkql_watermark.h12
-rw-r--r--ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp2
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp4
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp4
-rw-r--r--ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h42
-rw-r--r--ydb/library/yql/providers/dq/actors/executer_actor.cpp2
-rw-r--r--ydb/library/yql/providers/dq/actors/worker_actor.cpp5
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.cpp2
-rw-r--r--ydb/library/yql/providers/dq/common/yql_dq_settings.h5
-rw-r--r--ydb/library/yql/providers/dq/opt/logical_optimize.cpp18
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp62
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.h2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp27
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp1
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp34
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h2
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp211
-rw-r--r--ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h1
-rw-r--r--ydb/library/yql/providers/pq/common/yql_names.h1
-rw-r--r--ydb/library/yql/providers/pq/proto/dq_io.proto6
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp19
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp4
-rw-r--r--ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp2
60 files changed, 1426 insertions, 192 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 9b75341d47e..291a547b873 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -152,7 +152,7 @@ private:
TActorBootstrapped<TKqpStreamLookupActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64) final {
i64 totalDataSize = 0;
if (TableScheme) {
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 0546662c690..366a0dd290b 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -1328,6 +1328,8 @@ private:
apply("EnableComputeActor", "1");
apply("ComputeActorType", "async");
apply("_EnablePrecompute", "1");
+ apply("WatermarksMode", "disable");
+ apply("WatermarksGranularityMs", "1000");
switch (Params.QueryType) {
case YandexQuery::QueryContent::STREAMING: {
diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
index 342a1f2e704..dc77e6f1275 100644
--- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
+++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json
@@ -1690,7 +1690,8 @@
{"Index": 9, "Name": "SaveHandler", "Type": "TExprBase"},
{"Index": 10, "Name": "LoadHandler", "Type": "TExprBase"},
{"Index": 11, "Name": "MergeHandler", "Type": "TCoLambda"},
- {"Index": 12, "Name": "FinishHandler", "Type": "TCoLambda"}
+ {"Index": 12, "Name": "FinishHandler", "Type": "TCoLambda"},
+ {"Index": 13, "Name": "WatermarkMode", "Type": "TExprBase"}
]
},
{
diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp
index 17c32e5eb35..e8a69409684 100644
--- a/ydb/library/yql/core/type_ann/type_ann_list.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp
@@ -5003,7 +5003,7 @@ namespace {
TStringBuilder() << "Unsupported agg name: " << name));
return IGraphTransformer::TStatus::Error;
}
-
+
return IGraphTransformer::TStatus::Ok;
}
@@ -6053,7 +6053,7 @@ namespace {
IGraphTransformer::TStatus MultiHoppingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
Y_UNUSED(output);
- if (!EnsureArgsCount(*input, 13, ctx.Expr)) {
+ if (!EnsureArgsCount(*input, 14, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
auto& item = input->ChildRef(0);
diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.txt
index 02fbceab3a9..c1d52365482 100644
--- a/ydb/library/yql/dq/actors/compute/CMakeLists.txt
+++ b/ydb/library/yql/dq/actors/compute/CMakeLists.txt
@@ -27,12 +27,14 @@ target_link_libraries(dq-actors-compute PUBLIC
yql-public-issue
)
target_sources(dq-actors-compute PRIVATE
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
- ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp
)
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index e90e3ec0107..7eb7ec80072 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -259,7 +259,7 @@ private:
return true; // handled channels syncronously
}
CA_LOG_D("DoHandleChannelsAfterFinishImpl");
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(std::move(req), /* checkpointOnly = */ true));
+ AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ true));
return false;
}
@@ -289,15 +289,40 @@ private:
auto finished = channelData.GetFinished();
+ TMaybe<TInstant> watermark;
+ if (channelData.HasWatermark()) {
+ watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs());
+
+ const bool channelWatermarkChanged = WatermarksTracker.NotifyInChannelWatermarkReceived(
+ inputChannel->ChannelId,
+ *watermark
+ );
+
+ if (channelWatermarkChanged) {
+ CA_LOG_T("Pause input channel " << channelData.GetChannelId() << " bacause of watermark " << *watermark);
+ inputChannel->Pause(*watermark);
+ }
+
+ WatermarkTakeInputChannelDataRequests[*watermark]++;
+ }
+
+ DqComputeActorMetrics.ReportInputChannelWatermark(
+ channelData.GetChannelId(),
+ channelData.GetData().GetRows(),
+ watermark);
+
auto ev = (channelData.GetData().GetRows())
? MakeHolder<NTaskRunnerActor::TEvPush>(
channelData.GetChannelId(),
std::move(*channelData.MutableData()),
finished,
- /*askFreeSpace = */ true,
+ /* askFreeSpace = */ true,
/* pauseAfterPush = */ channelData.HasCheckpoint())
: MakeHolder<NTaskRunnerActor::TEvPush>(
- channelData.GetChannelId(), finished, /*askFreeSpace = */ true, /* pauseAfterPush = */ channelData.HasCheckpoint());
+ channelData.GetChannelId(),
+ finished,
+ /* askFreeSpace = */ true,
+ /* pauseAfterPush = */ channelData.HasCheckpoint());
Send(TaskRunnerActorId, ev.Release(), 0, Cookie);
@@ -309,7 +334,7 @@ private:
Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId());
}
- TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId()};
+ TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId(), watermark};
}
void PassAway() override {
@@ -327,6 +352,31 @@ private:
TBase::PassAway();
}
+ TMaybe<NTaskRunnerActor::TWatermarkRequest> GetWatermarkRequest() {
+ if (!WatermarksTracker.HasPendingWatermark()) {
+ return Nothing();
+ }
+
+ const auto pendingWatermark = *WatermarksTracker.GetPendingWatermark();
+ if (WatermarkTakeInputChannelDataRequests.contains(pendingWatermark)) {
+ // Not all precending to watermark input channels data has been injected
+ return Nothing();
+ }
+
+ TVector<ui32> channelIds;
+ for (const auto& [channelId, info] : OutputChannelsMap) {
+ if (info.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
+ continue;
+ }
+
+ channelIds.emplace_back(channelId);
+ }
+
+ DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark);
+
+ return TMaybe<NTaskRunnerActor::TWatermarkRequest>({std::move(channelIds), pendingWatermark});
+ }
+
TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() {
TMaybe<NTaskRunnerActor::TCheckpointRequest> req = Nothing();
if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) {
@@ -343,7 +393,7 @@ private:
if (ProcessSourcesState.Inflight == 0) {
auto req = GetCheckpointRequest();
CA_LOG_T("DoExecuteImpl: " << (bool) req);
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(std::move(req), /* checkpointOnly = */ false));
+ AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ false));
}
}
@@ -428,6 +478,11 @@ private:
PollSources(std::move(sourcesState));
}
+ if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) {
+ ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark());
+ WatermarksTracker.PopPendingWatermark();
+ }
+
ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState;
if (ev->Get()->CheckpointRequestedFromTaskRunner) {
CheckpointRequestedFromTaskRunner = false;
@@ -473,7 +528,7 @@ private:
ProcessSourcesState.Inflight--;
if (ProcessSourcesState.Inflight == 0) {
CA_LOG_T("send TEvContinueRun on OnAsyncInputPushFinished");
- AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>());
+ AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), Nothing(), false));
}
}
@@ -487,6 +542,7 @@ private:
TOutputChannelInfo& outputChannel = it->second;
Y_VERIFY(!outputChannel.AsyncData); // have finished previous cycle
outputChannel.AsyncData.ConstructInPlace();
+ outputChannel.AsyncData->Watermark = std::move(ev->Get()->Watermark);
outputChannel.AsyncData->Data = std::move(ev->Get()->Data);
outputChannel.AsyncData->Checkpoint = std::move(ev->Get()->Checkpoint);
outputChannel.AsyncData->Finished = ev->Get()->Finished;
@@ -519,10 +575,23 @@ private:
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty();
+ if (asyncData.Watermark.Defined()) {
+ const auto watermark = TInstant::MicroSeconds(asyncData.Watermark->GetTimestampUs());
+ const bool shouldResumeInputs = WatermarksTracker.NotifyOutputChannelWatermarkSent(
+ outputChannel.ChannelId,
+ watermark
+ );
+
+ if (shouldResumeInputs) {
+ ResumeInputsByWatermark(watermark);
+ }
+ }
+
if (!shouldSkipData) {
if (asyncData.Checkpoint.Defined()) {
- ResumeInputs();
+ ResumeInputsByCheckpoint();
}
+
for (ui32 i = 0; i < asyncData.Data.size(); i++) {
auto& chunk = asyncData.Data[i];
NDqProto::TChannelData channelData;
@@ -530,16 +599,22 @@ private:
// set finished only for last chunk
const bool lastChunk = i == asyncData.Data.size() - 1;
channelData.SetFinished(asyncData.Finished && lastChunk);
+ channelData.MutableData()->Swap(&chunk);
+ if (lastChunk && asyncData.Watermark.Defined()) {
+ channelData.MutableWatermark()->Swap(&*asyncData.Watermark);
+ }
if (lastChunk && asyncData.Checkpoint.Defined()) {
channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
- channelData.MutableData()->Swap(&chunk);
Channels->SendChannelData(std::move(channelData));
}
if (asyncData.Data.empty() && asyncData.Changed) {
NDqProto::TChannelData channelData;
channelData.SetChannelId(outputChannel.ChannelId);
channelData.SetFinished(asyncData.Finished);
+ if (asyncData.Watermark.Defined()) {
+ channelData.MutableWatermark()->Swap(&*asyncData.Watermark);
+ }
if (asyncData.Checkpoint.Defined()) {
channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
@@ -573,6 +648,19 @@ private:
auto it = TakeInputChannelDataRequests.find(ev->Cookie);
YQL_ENSURE(it != TakeInputChannelDataRequests.end());
+ CA_LOG_T("Input data push finished. Cookie: " << ev->Cookie
+ << " Watermark: " << it->second.Watermark
+ << " Ack: " << it->second.Ack
+ << " TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size()
+ << " WatermarkTakeInputChannelDataRequests: " << WatermarkTakeInputChannelDataRequests.size());
+
+ if (it->second.Watermark.Defined()) {
+ auto& ct = WatermarkTakeInputChannelDataRequests.at(*it->second.Watermark);
+ if (--ct == 0) {
+ WatermarkTakeInputChannelDataRequests.erase(*it->second.Watermark);
+ }
+ }
+
if (it->second.Ack) {
TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId);
Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace);
@@ -602,7 +690,7 @@ private:
}
if (checkpoint) {
CA_LOG_I("Resume inputs");
- ResumeInputs();
+ ResumeInputsByCheckpoint();
}
sinkInfo.PopStarted = false;
@@ -745,6 +833,7 @@ private:
ProcessContinueRun();
}
+private:
NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr;
NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor = nullptr;
NActors::TActorId TaskRunnerActorId;
@@ -762,8 +851,12 @@ private:
struct TTakeInputChannelData {
bool Ack;
ui64 ChannelId;
+ TMaybe<TInstant> Watermark;
};
THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests;
+ // Watermark should be injected to task runner only after all precending data is injected
+ // This hash map will help to track the right moment
+ THashMap<TInstant, ui32> WatermarkTakeInputChannelDataRequests;
ui64 Cookie = 0;
NDq::TDqTaskRunnerStatsView TaskRunnerStats;
bool ReadyToCheckpointFlag;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index 6be9f937132..c64e3255973 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -69,9 +69,14 @@ struct IDqComputeActorAsyncInput {
virtual ui64 GetInputIndex() const = 0;
// Gets data and returns space used by filled data batch.
+ // Watermark will be returned if source watermark was moved forward. Watermark should be handled AFTER data.
// Method should be called under bound mkql allocator.
// Could throw YQL errors.
- virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0;
+ virtual i64 GetAsyncInputData(
+ NKikimr::NMiniKQL::TUnboxedValueVector& batch,
+ TMaybe<TInstant>& watermark,
+ bool& finished,
+ i64 freeSpace) = 0;
// Checkpointing.
virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0;
@@ -150,6 +155,7 @@ public:
const NDqProto::TTaskInput& InputDesc;
ui64 InputIndex;
TTxId TxId;
+ ui64 TaskId;
const THashMap<TString, TString>& SecureParams;
const THashMap<TString, TString>& TaskParams;
const NActors::TActorId& ComputeActorId;
@@ -161,6 +167,7 @@ public:
const NDqProto::TTaskOutput& OutputDesc;
ui64 OutputIndex;
TTxId TxId;
+ ui64 TaskId;
IDqComputeActorAsyncOutput::ICallbacks* Callback;
const THashMap<TString, TString>& SecureParams;
const THashMap<TString, TString>& TaskParams;
@@ -173,6 +180,7 @@ public:
const NDqProto::TTaskInput& InputDesc;
const ui64 InputIndex;
TTxId TxId;
+ ui64 TaskId;
const NUdf::TUnboxedValue TransformInput;
const THashMap<TString, TString>& SecureParams;
const THashMap<TString, TString>& TaskParams;
@@ -186,6 +194,7 @@ public:
const NDqProto::TTaskOutput& OutputDesc;
const ui64 OutputIndex;
TTxId TxId;
+ ui64 TaskId;
const IDqOutputConsumer::TPtr TransformOutput;
IDqComputeActorAsyncOutput::ICallbacks* Callback;
const THashMap<TString, TString>& SecureParams;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
index f4fdf0569da..e4c6b2f8558 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp
@@ -109,6 +109,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev)
<< ", seqNo: " << record.GetSeqNo()
<< ", size: " << channelData.GetData().GetRaw().size()
<< ", rows: " << channelData.GetData().GetRows()
+ << ", watermark: " << channelData.HasWatermark()
<< ", checkpoint: " << channelData.HasCheckpoint()
<< ", finished: " << channelData.GetFinished()
<< ", from: " << ev->Sender
@@ -555,6 +556,7 @@ void TDqComputeActorChannels::SendChannelData(NDqProto::TChannelData&& channelDa
<< ", peer: " << *outputChannel.Peer
<< ", rows: " << chunkRows
<< ", bytes: " << chunkBytes
+ << ", watermark: " << channelData.HasWatermark()
<< ", checkpoint: " << channelData.HasCheckpoint()
<< ", seqNo: " << seqNo
<< ", finished: " << finished);
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index f47e15c2666..1f6956773d7 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -226,7 +226,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato
AbortCheckpoint();
if (resumeInputs) {
LOG_W("Drop pending checkpoint since coordinator is stale");
- ComputeActor->ResumeInputs();
+ ComputeActor->ResumeInputsByCheckpoint();
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
index 61c007708da..bbfb4a327fc 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h
@@ -72,7 +72,7 @@ public:
virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const = 0;
virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0;
virtual void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) = 0;
- virtual void ResumeInputs() = 0;
+ virtual void ResumeInputsByCheckpoint() = 0;
virtual void Start() = 0;
virtual void Stop() = 0;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index fd667d846df..eab5bcf8a0b 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1,9 +1,11 @@
#pragma once
-#include "dq_compute_actor.h"
+#include "dq_compute_actor_async_io.h"
#include "dq_compute_actor_channels.h"
#include "dq_compute_actor_checkpoints.h"
-#include "dq_compute_actor_async_io.h"
+#include "dq_compute_actor_metrics.h"
+#include "dq_compute_actor_watermarks.h"
+#include "dq_compute_actor.h"
#include "dq_compute_issues_buffer.h"
#include "dq_compute_memory_quota.h"
@@ -28,27 +30,28 @@
#include <util/system/hostname.h>
#include <any>
+#include <queue>
#if defined CA_LOG_D || defined CA_LOG_I || defined CA_LOG_E || defined CA_LOG_C
# error log macro definition clash
#endif
#define CA_LOG_T(s) \
- LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_D(s) \
- LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_I(s) \
- LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_W(s) \
- LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_N(s) \
- LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_E(s) \
- LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG_C(s) \
- LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
#define CA_LOG(prio, s) \
- LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s)
+ LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s)
namespace NYql {
@@ -176,6 +179,7 @@ protected:
: ExecuterId(executerId)
, TxId(txId)
, Task(std::move(task))
+ , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ")
, RuntimeSettings(settings)
, MemoryLimits(memoryLimits)
, CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn)
@@ -184,6 +188,8 @@ protected:
, CheckpointingMode(GetTaskCheckpointingMode(Task))
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr)
+ , WatermarksTracker(this->SelfId(), TxId, Task.GetId())
+ , DqComputeActorMetrics(taskCounters)
, Running(!Task.GetCreateSuspended())
, PassExceptions(passExceptions)
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
@@ -193,6 +199,7 @@ protected:
}
InitializeTask();
InitMonCounters(taskCounters);
+ InitializeWatermarks();
}
TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task,
@@ -211,6 +218,8 @@ protected:
, FunctionRegistry(functionRegistry)
, State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING)
, MemoryQuota(InitMemoryQuota())
+ , WatermarksTracker(this->SelfId(), TxId, Task.GetId())
+ , DqComputeActorMetrics(taskCounters)
, Running(!Task.GetCreateSuspended())
, ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor")
{
@@ -219,6 +228,7 @@ protected:
}
InitializeTask();
InitMonCounters(taskCounters);
+ InitializeWatermarks();
}
void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) {
@@ -775,9 +785,33 @@ protected:
}
}
- void ResumeInputs() override {
+ void ResumeInputsByWatermark(TInstant watermark) {
+ for (auto& [id, sourceInfo] : SourcesMap) {
+ if (sourceInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
+ continue;
+ }
+
+ const auto channelId = id;
+ CA_LOG_T("Resume source " << channelId << " by completed watermark");
+
+ sourceInfo.ResumeByWatermark(watermark);
+ }
+
for (auto& [id, channelInfo] : InputChannelsMap) {
- channelInfo.Resume();
+ if (channelInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) {
+ continue;
+ }
+
+ const auto channelId = id;
+ CA_LOG_T("Resume input channel " << channelId << " by completed watermark");
+
+ channelInfo.ResumeByWatermark(watermark);
+ }
+ }
+
+ void ResumeInputsByCheckpoint() override {
+ for (auto& [id, channelInfo] : InputChannelsMap) {
+ channelInfo.ResumeByCheckpoint();
}
}
@@ -839,25 +873,40 @@ protected:
protected:
struct TInputChannelInfo {
+ const TString LogPrefix;
ui64 ChannelId;
IDqInputChannel::TPtr Channel;
bool HasPeer = false;
+ std::queue<TInstant> PendingWatermarks;
+ const NDqProto::EWatermarksMode WatermarksMode;
std::optional<NDqProto::TCheckpoint> PendingCheckpoint;
const NDqProto::ECheckpointingMode CheckpointingMode;
ui64 FreeSpace = 0;
- explicit TInputChannelInfo(ui64 channelId, NDqProto::ECheckpointingMode checkpointingMode)
- : ChannelId(channelId)
+ explicit TInputChannelInfo(
+ const TString& logPrefix,
+ ui64 channelId,
+ NDqProto::EWatermarksMode watermarksMode,
+ NDqProto::ECheckpointingMode checkpointingMode)
+ : LogPrefix(logPrefix)
+ , ChannelId(channelId)
+ , WatermarksMode(watermarksMode)
, CheckpointingMode(checkpointingMode)
{
}
bool IsPaused() const {
- return PendingCheckpoint.has_value();
+ return PendingWatermarks.empty() || PendingCheckpoint.has_value();
+ }
+
+ void Pause(TInstant watermark) {
+ YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED);
+
+ PendingWatermarks.emplace(watermark);
}
void Pause(const NDqProto::TCheckpoint& checkpoint) {
- YQL_ENSURE(!IsPaused());
+ YQL_ENSURE(!PendingCheckpoint);
YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED);
PendingCheckpoint = checkpoint;
if (Channel) { // async actor doesn't hold channels, so channel is paused in task runner actor
@@ -865,7 +914,17 @@ protected:
}
}
- void Resume() {
+ void ResumeByWatermark(TInstant watermark) {
+ while (!PendingWatermarks.empty() && PendingWatermarks.front() <= watermark) {
+ if (PendingWatermarks.front() != watermark) {
+ CA_LOG_W("Input channel " << ChannelId <<
+ " watermarks were collapsed. See YQ-1441. Dropped watermark: " << PendingWatermarks.front());
+ }
+ PendingWatermarks.pop();
+ }
+ }
+
+ void ResumeByCheckpoint() {
PendingCheckpoint.reset();
if (Channel) { // async actor doesn't hold channels, so channel is resumed in task runner actor
Channel->Resume();
@@ -874,6 +933,7 @@ protected:
};
struct TAsyncInputInfoBase {
+ const TString LogPrefix;
ui64 Index;
IDqAsyncInputBuffer::TPtr Buffer;
IDqComputeActorAsyncInput* AsyncInput = nullptr;
@@ -882,8 +942,31 @@ protected:
bool Finished = false;
i64 FreeSpace = 1;
bool PushStarted = false;
+ const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
+ TMaybe<TInstant> PendingWatermark = Nothing();
+
+ explicit TAsyncInputInfoBase(
+ const TString& logPrefix,
+ ui64 index,
+ NDqProto::EWatermarksMode watermarksMode)
+ : LogPrefix(logPrefix)
+ , Index(index)
+ , IssuesBuffer(IssuesBufferSize)
+ , WatermarksMode(watermarksMode) {}
+
+ bool IsPausedByWatermark() {
+ return PendingWatermark.Defined();
+ }
- explicit TAsyncInputInfoBase(ui64 index) : Index(index), IssuesBuffer(IssuesBufferSize) {}
+ void Pause(TInstant watermark) {
+ YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED);
+ PendingWatermark = watermark;
+ }
+
+ void ResumeByWatermark(TInstant watermark) {
+ YQL_ENSURE(watermark == PendingWatermark);
+ PendingWatermark = Nothing();
+ }
};
struct TAsyncInputTransformInfo : public TAsyncInputInfoBase {
@@ -900,6 +983,7 @@ protected:
bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints.
bool PopStarted = false;
bool IsTransformOutput = false; // Is this channel output of a transform.
+ NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
explicit TOutputChannelInfo(ui64 channelId)
: ChannelId(channelId)
@@ -913,6 +997,7 @@ protected:
struct TAsyncData { // Is used in case of async compute actor
TVector<NDqProto::TData> Data;
+ TMaybe<NDqProto::TWatermark> Watermark;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
bool Finished = false;
bool Changed = false;
@@ -1133,7 +1218,6 @@ protected:
this->TerminateSources(issues, success);
if (ev->Sender != ExecuterId) {
-
if (ComputeActorSpan) {
ComputeActorSpan.End();
}
@@ -1217,11 +1301,13 @@ private:
auto channel = outputChannel.Channel;
NDqProto::TData data;
+ NDqProto::TWatermark watermark;
NDqProto::TCheckpoint checkpoint;
bool hasData = channel->Pop(data, bytes);
+ bool hasWatermark = channel->Pop(watermark);
bool hasCheckpoint = channel->Pop(checkpoint);
- if (!hasData && !hasCheckpoint) {
+ if (!hasData && !hasWatermark && !hasCheckpoint) {
if (!channel->IsFinished()) {
CA_LOG_D("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished");
return 0; // channel is empty and not finished yet
@@ -1232,6 +1318,7 @@ private:
const bool becameFinished = !wasFinished && outputChannel.Finished;
ui32 dataSize = data.GetRaw().size();
+ ui32 watermarkSize = watermark.ByteSize();
ui32 checkpointSize = checkpoint.ByteSize();
NDqProto::TChannelData channelData;
@@ -1240,15 +1327,22 @@ private:
if (hasData) {
channelData.MutableData()->Swap(&data);
}
+ if (hasWatermark) {
+ channelData.MutableWatermark()->Swap(&watermark);
+ CA_LOG_I("Resume inputs by watermark");
+ // This is excessive, inputs should be resumed after async CA received response with watermark from task runner.
+ // But, let it be here, it's better to have the same code as in checkpoints
+ ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs()));
+ }
if (hasCheckpoint) {
channelData.MutableCheckpoint()->Swap(&checkpoint);
- CA_LOG_I("Resume inputs");
- ResumeInputs();
+ CA_LOG_I("Resume inputs by checkpoint");
+ ResumeInputsByCheckpoint();
}
- if (hasData || hasCheckpoint || becameFinished) {
+ if (hasData || hasWatermark || hasCheckpoint || becameFinished) {
Channels->SendChannelData(std::move(channelData));
- return dataSize + checkpointSize;
+ return dataSize + watermarkSize + checkpointSize;
}
return 0;
}
@@ -1315,7 +1409,7 @@ private:
if (hasCheckpoint) {
maybeCheckpoint = checkpoint;
CA_LOG_I("Resume inputs");
- ResumeInputs();
+ ResumeInputsByCheckpoint();
}
outputInfo.AsyncOutput->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, outputInfo.Finished);
@@ -1423,6 +1517,7 @@ protected:
.InputDesc = inputDesc,
.InputIndex = inputIndex,
.TxId = TxId,
+ .TaskId = Task.GetId(),
.TransformInput = transform.InputBuffer,
.SecureParams = secureParams,
.TaskParams = taskParams,
@@ -1498,16 +1593,24 @@ protected:
void PollAsyncInput(TAsyncInputInfoBase& info, ui64 inputIndex) {
Y_VERIFY(!TaskRunner || info.Buffer);
+
if (info.Finished) {
CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished");
return;
}
+
+ if (info.IsPausedByWatermark()) {
+ CA_LOG_T("Skip polling async input[" << inputIndex << "]: paused");
+ return;
+ }
+
const i64 freeSpace = AsyncIoFreeSpace(info);
if (freeSpace > 0) {
+ TMaybe<TInstant> watermark;
NKikimr::NMiniKQL::TUnboxedValueVector batch;
Y_VERIFY(info.AsyncInput);
bool finished = false;
- const i64 space = info.AsyncInput->GetAsyncInputData(batch, finished, freeSpace);
+ const i64 space = info.AsyncInput->GetAsyncInputData(batch, watermark, finished, freeSpace);
CA_LOG_T("Poll async input " << inputIndex
<< ". Buffer free space: " << freeSpace
<< ", read from async input: " << space << " bytes, "
@@ -1519,6 +1622,20 @@ protected:
// but we haven't read all of it.
ContinueExecute();
}
+
+ DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.size(), watermark);
+
+ if (watermark) {
+ const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived(
+ inputIndex,
+ *watermark);
+
+ if (inputWatermarkChanged) {
+ CA_LOG_T("Pause async input " << inputIndex << " because of watermark " << *watermark);
+ info.Pause(*watermark);
+ }
+ }
+
AsyncInputPush(std::move(batch), info, space, finished);
}
}
@@ -1628,16 +1745,28 @@ private:
Y_VERIFY(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels
if (inputDesc.HasTransform()) {
- auto result = InputTransformsMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple(i));
+ auto result = InputTransformsMap.emplace(
+ std::piecewise_construct,
+ std::make_tuple(i),
+ std::make_tuple(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED)
+ );
YQL_ENSURE(result.second);
}
if (inputDesc.HasSource()) {
- auto result = SourcesMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple(i));
+ const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode();
+ auto result = SourcesMap.emplace(i, TAsyncInputInfoBase(LogPrefix, i, watermarksMode));
YQL_ENSURE(result.second);
} else {
for (auto& channel : inputDesc.GetChannels()) {
- auto result = InputChannelsMap.emplace(channel.GetId(), TInputChannelInfo(channel.GetId(), channel.GetCheckpointingMode()));
+ auto result = InputChannelsMap.emplace(
+ channel.GetId(),
+ TInputChannelInfo(
+ LogPrefix,
+ channel.GetId(),
+ channel.GetWatermarksMode(),
+ channel.GetCheckpointingMode())
+ );
YQL_ENSURE(result.second);
}
}
@@ -1661,6 +1790,7 @@ private:
TOutputChannelInfo outputChannel(channel.GetId());
outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId();
outputChannel.IsTransformOutput = outputDesc.HasTransform();
+ outputChannel.WatermarksMode = channel.GetWatermarksMode();
if (Y_UNLIKELY(RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) {
outputChannel.Stats = MakeHolder<typename TOutputChannelInfo::TStats>();
@@ -1673,6 +1803,26 @@ private:
}
}
+ void InitializeWatermarks() {
+ for (const auto& [id, source] : SourcesMap) {
+ if (source.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
+ WatermarksTracker.RegisterAsyncInput(id);
+ }
+ }
+
+ for (const auto& [id, channel] : InputChannelsMap) {
+ if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
+ WatermarksTracker.RegisterInputChannel(id);
+ }
+ }
+
+ for (const auto& [id, channel] : OutputChannelsMap) {
+ if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) {
+ WatermarksTracker.RegisterOutputChannel(id);
+ }
+ }
+ }
+
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() {
if (!TaskRunner) {
return nullptr;
@@ -1815,6 +1965,7 @@ protected:
const NActors::TActorId ExecuterId;
const TTxId TxId;
const NDqProto::TDqTask Task;
+ const TString LogPrefix;
const TComputeRuntimeSettings RuntimeSettings;
const TComputeMemoryLimits MemoryLimits;
const bool CanAllocateExtraMemory = false;
@@ -1850,6 +2001,8 @@ protected:
TProcessOutputsState ProcessOutputsState;
THolder<TDqMemoryQuota> MemoryQuota;
+ TDqComputeActorWatermarks WatermarksTracker;
+ TDqComputeActorMetrics DqComputeActorMetrics;
private:
bool Running = true;
TInstant LastSendStatsTime;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
new file mode 100644
index 00000000000..a6547c94aa5
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp
@@ -0,0 +1,93 @@
+#include "dq_compute_actor_metrics.h"
+
+namespace NYql::NDq {
+
+TDqComputeActorMetrics::TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters) {
+ if (!counters) {
+ return;
+ }
+
+ ComputeActorSubgroup = counters->GetSubgroup("subsystem", "compute_actor");
+ InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms");
+ InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms");
+ WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram(
+ "watermark_collect_ms",
+ NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000}));
+}
+
+void TDqComputeActorMetrics::ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) {
+ if (!Enable) {
+ return;
+ }
+
+ auto counters = GetAsyncInputCounters(id);
+ counters->GetCounter("rows", true)->Add(dataSize);
+
+ if (!watermark) {
+ return;
+ }
+
+ ReportInputWatermarkMetrics(counters, *watermark);
+}
+
+void TDqComputeActorMetrics::ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) {
+ if (!Enable) {
+ return;
+ }
+
+ auto counters = GetInputChannelCounters(id);
+ counters->GetCounter("rows", true)->Add(dataSize);
+ if (!watermark) {
+ return;
+ }
+
+ ReportInputWatermarkMetrics(counters, *watermark);
+}
+
+void TDqComputeActorMetrics::ReportInjectedToTaskRunnerWatermark(TInstant watermark) {
+ if (!Enable) {
+ return;
+ }
+
+ InjectedToTaskRunnerWatermark->Set(watermark.MilliSeconds());
+}
+
+void TDqComputeActorMetrics::ReportInjectedToOutputsWatermark(TInstant watermark) {
+ if (!Enable) {
+ return;
+ }
+
+ InjectedToOutputsWatermark->Set(watermark.MilliSeconds());
+ auto iter = WatermarkStartedAt.find(watermark);
+ if (iter != WatermarkStartedAt.end()) {
+ WatermarkCollectLatency->Collect((TInstant::Now() - iter->second).MilliSeconds());
+ WatermarkStartedAt.erase(iter);
+ }
+}
+
+NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetAsyncInputCounters(ui32 id) {
+ auto iter = AsyncInputsCounters.find(id);
+ if (iter == AsyncInputsCounters.end()) {
+ iter = AsyncInputsCounters.emplace(id, ComputeActorSubgroup->GetSubgroup("async_input", ToString(id))).first;
+ }
+
+ return iter->second;
+}
+
+NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetInputChannelCounters(ui32 id) {
+ auto iter = InputChannelsCounters.find(id);
+ if (iter == InputChannelsCounters.end()) {
+ iter = InputChannelsCounters.emplace(id, ComputeActorSubgroup->GetSubgroup("input_channel", ToString(id))).first;
+ }
+
+ return iter->second;
+}
+
+void TDqComputeActorMetrics::ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark) {
+ counters->GetCounter("watermark_ms")->Set(watermark.MilliSeconds());
+ if (!WatermarkStartedAt.contains(watermark)) {
+ WatermarkStartedAt[watermark] = TInstant::Now();
+ }
+}
+
+}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
new file mode 100644
index 00000000000..3e9afbf64ae
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/hash.h>
+
+namespace NYql::NDq {
+
+struct TDqComputeActorMetrics {
+public:
+ TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters);
+
+ void ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark);
+ void ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark);
+ void ReportInjectedToTaskRunnerWatermark(TInstant watermark);
+ void ReportInjectedToOutputsWatermark(TInstant watermark);
+
+private:
+ NMonitoring::TDynamicCounterPtr GetAsyncInputCounters(ui32 id);
+ NMonitoring::TDynamicCounterPtr GetInputChannelCounters(ui32 id);
+ void ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark);
+
+private:
+ bool Enable = false;
+ NMonitoring::TDynamicCounterPtr ComputeActorSubgroup;
+ THashMap<ui32, NMonitoring::TDynamicCounterPtr> AsyncInputsCounters;
+ THashMap<ui32, NMonitoring::TDynamicCounterPtr> InputChannelsCounters;
+ NMonitoring::TDynamicCounters::TCounterPtr WatermarkCt;
+ NMonitoring::TDynamicCounters::TCounterPtr InjectedToTaskRunnerWatermark;
+ NMonitoring::TDynamicCounters::TCounterPtr InjectedToOutputsWatermark;
+ NMonitoring::THistogramPtr WatermarkCollectLatency;
+
+ THashMap<TInstant, TInstant> WatermarkStartedAt;
+};
+
+}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
new file mode 100644
index 00000000000..d99efbdffe4
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp
@@ -0,0 +1,169 @@
+#include "dq_compute_actor_watermarks.h"
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h>
+
+#include <algorithm>
+
+#define LOG_T(s) \
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+#define LOG_D(s) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+#define LOG_I(s) \
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+#define LOG_W(s) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+#define LOG_E(s) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s)
+
+namespace NYql::NDq {
+
+using namespace NActors;
+
+TDqComputeActorWatermarks::TDqComputeActorWatermarks(
+ NActors::TActorIdentity selfId,
+ const TTxId txId,
+ ui64 taskId
+)
+ : SelfId(selfId)
+ , TxId(txId)
+ , TaskId(taskId) {
+}
+
+void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) {
+ AsyncInputsWatermarks[inputId] = Nothing();
+}
+
+void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) {
+ InputChannelsWatermarks[inputId] = Nothing();
+}
+
+void TDqComputeActorWatermarks::RegisterOutputChannel(ui64 outputId) {
+ OutputChannelsWatermarks[outputId] = Nothing();
+}
+
+bool TDqComputeActorWatermarks::HasOutputChannels() const {
+ return !OutputChannelsWatermarks.empty();
+}
+
+bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark) {
+ LOG_T("Async input " << inputId << " notified about watermark " << watermark);
+
+ auto& asyncInputWatermark = AsyncInputsWatermarks[inputId];
+ if (!asyncInputWatermark || *asyncInputWatermark < watermark) {
+ LOG_T("Async input " << inputId << " watermark was updated to " << watermark);
+ asyncInputWatermark = watermark;
+ RecalcPendingWatermark();
+ return true;
+ }
+
+ return false;
+}
+
+bool TDqComputeActorWatermarks::NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark) {
+ LOG_T("Input channel " << inputId << " notified about watermark " << watermark);
+
+ auto& inputChannelWatermark = InputChannelsWatermarks[inputId];
+ if (!inputChannelWatermark || *inputChannelWatermark < watermark) {
+ LOG_T("Input channel " << inputId << " watermark was updated to " << watermark);
+ inputChannelWatermark = watermark;
+ RecalcPendingWatermark();
+ return true;
+ }
+
+ return false;
+}
+
+bool TDqComputeActorWatermarks::NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark) {
+ auto logPrefix = TStringBuilder() << "Output channel "
+ << outputId << " notified about watermark '" << watermark << "'";
+
+ LOG_T(logPrefix);
+
+ if (watermark <= LastWatermark) {
+ LOG_E(logPrefix << "' when LastWatermark was already forwarded to " << *LastWatermark);
+ // We will try to ignore this error, but something strange happened
+ }
+
+ if (watermark != PendingWatermark) {
+ LOG_E(logPrefix << " when '" << PendingWatermark << "' was expected");
+ // We will try to ignore this error, but something strange happened
+ }
+
+ OutputChannelsWatermarks[outputId] = watermark;
+
+ return MaybePopPendingWatermark();
+}
+
+bool TDqComputeActorWatermarks::HasPendingWatermark() const {
+ return PendingWatermark.Defined();
+}
+
+TMaybe<TInstant> TDqComputeActorWatermarks::GetPendingWatermark() const {
+ return PendingWatermark;
+}
+
+void TDqComputeActorWatermarks::RecalcPendingWatermark() {
+ if (AsyncInputsWatermarks.empty() && InputChannelsWatermarks.empty()) {
+ return;
+ }
+
+ auto newWatermark = TInstant::Max();
+ for (const auto& [_, watermark] : AsyncInputsWatermarks) {
+ if (!watermark) {
+ return;
+ }
+
+ newWatermark = std::min(newWatermark, *watermark);
+ }
+
+ for (const auto& [_, watermark] : InputChannelsWatermarks) {
+ if (!watermark) {
+ return;
+ }
+
+ newWatermark = std::min(newWatermark, *watermark);
+ }
+
+ if (!LastWatermark || newWatermark != LastWatermark) {
+ LOG_T("New pending watermark " << newWatermark);
+ PendingWatermark = newWatermark;
+ }
+}
+
+bool TDqComputeActorWatermarks::MaybePopPendingWatermark() {
+ if (OutputChannelsWatermarks.empty()) {
+ return true;
+ }
+
+ if (!PendingWatermark) {
+ LOG_E("There is no pending watermark, but pop was called");
+ // We will try to ignore this error, but something strange happened
+ return true;
+ }
+
+ auto outWatermark = TInstant::Max();
+ for (const auto& [_, watermark] : OutputChannelsWatermarks) {
+ if (!watermark) {
+ return false;
+ }
+
+ outWatermark = std::min(outWatermark, *watermark);
+ }
+
+ if (outWatermark >= *PendingWatermark) {
+ LastWatermark = PendingWatermark;
+ PopPendingWatermark();
+ return true;
+ }
+
+ return false;
+}
+
+void TDqComputeActorWatermarks::PopPendingWatermark() {
+ LOG_T("Watermark " << *PendingWatermark << " was popped. ");
+ PendingWatermark = Nothing();
+}
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
new file mode 100644
index 00000000000..183fbf73dcb
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#include <ydb/library/yql/dq/common/dq_common.h>
+
+#include <library/cpp/actors/core/log.h>
+
+namespace NYql::NDq {
+
+class TDqComputeActorWatermarks
+{
+public:
+ TDqComputeActorWatermarks(NActors::TActorIdentity selfId, const TTxId graphId, ui64 taskId);
+
+ void RegisterAsyncInput(ui64 inputId);
+ void RegisterInputChannel(ui64 inputId);
+ void RegisterOutputChannel(ui64 outputId);
+ bool HasOutputChannels() const;
+
+ // Will return true, if local watermark inside this async input was moved forward.
+ // CA should pause this async input and wait for coresponding watermarks in all other sources/inputs.
+ bool NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark);
+
+ // Will return true, if local watermark inside this input channel was moved forward.
+ // CA should pause this input channel and wait for coresponding watermarks in all other sources/inputs.
+ bool NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark);
+
+ // Will return true, if watermark was sent to all registered outputs.
+ // CA should resume inputs and sources in this case
+ bool NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark);
+
+ bool HasPendingWatermark() const;
+ TMaybe<TInstant> GetPendingWatermark() const;
+ void PopPendingWatermark();
+
+private:
+ void RecalcPendingWatermark();
+ bool MaybePopPendingWatermark();
+
+private:
+ const NActors::TActorIdentity SelfId;
+ const TTxId TxId;
+ ui64 TaskId;
+
+ std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks;
+ std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks;
+ std::unordered_map<ui64, TMaybe<TInstant>> OutputChannelsWatermarks;
+
+ TMaybe<TInstant> PendingWatermark;
+ TMaybe<TInstant> LastWatermark;
+};
+
+} // namespace NYql::NDq
diff --git a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h
new file mode 100644
index 00000000000..01f8f139393
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h
@@ -0,0 +1,80 @@
+#include <util/datetime/base.h>
+#include <util/generic/maybe.h>
+#include <util/system/types.h>
+
+#include <optional>
+#include <unordered_map>
+
+namespace NYql::NDq {
+
+template <typename TPartitionKey>
+struct TDqSourceWatermarkTracker {
+public:
+ TDqSourceWatermarkTracker(
+ TDuration granularity,
+ TInstant startWatermark,
+ ui32 expectedPartitionsCount)
+ : Granularity(granularity)
+ , StartWatermark(ToDiscreteTime(startWatermark))
+ , ExpectedPartitionsCount(expectedPartitionsCount) {}
+
+ TMaybe<TInstant> NotifyNewPartitionTime(const TPartitionKey& partitionKey, TInstant time) {
+ auto granularPartitionTime = ToDiscreteTime(time);
+
+ auto iter = Data.find(partitionKey);
+ if (iter == Data.end()) {
+ Data[partitionKey] = granularPartitionTime;
+ return RecalcWatermark();
+ }
+
+ if (granularPartitionTime <= iter->second) {
+ return Nothing();
+ }
+
+ iter->second = granularPartitionTime;
+ return RecalcWatermark();
+ }
+
+private:
+ TInstant ToDiscreteTime(TInstant time) const {
+ return TInstant::MicroSeconds(time.MicroSeconds() - time.MicroSeconds() % Granularity.MicroSeconds());
+ }
+
+ TMaybe<TInstant> RecalcWatermark() {
+ if (!Watermark) {
+ // We have to inject start watermark before first data item, because some graph nodes can't start
+ // data processing without knowing what the current watermark is.
+ Watermark = StartWatermark;
+ return Watermark;
+ }
+
+ if (Data.size() < ExpectedPartitionsCount) {
+ // Each partition should notify time at least once before we are able to move watermark
+ return Nothing();
+ }
+
+ auto minTime = Data.begin()->second;
+ for (const auto& [_, time] : Data) {
+ if (time < minTime) {
+ minTime = time;
+ }
+ }
+
+ if (minTime > Watermark) {
+ Watermark = minTime;
+ return Watermark;
+ }
+
+ return Nothing();
+ }
+
+private:
+ const TDuration Granularity;
+ const TInstant StartWatermark;
+ const ui32 ExpectedPartitionsCount;
+
+ THashMap<TPartitionKey, TInstant> Data;
+ TMaybe<TInstant> Watermark;
+};
+
+}
diff --git a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt
index 060134b4ccc..653cd18127a 100644
--- a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt
+++ b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt
@@ -32,6 +32,7 @@ target_link_options(ydb-library-yql-dq-actors-compute-ut PRIVATE
)
target_sources(ydb-library-yql-dq-actors-compute-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_compute_issues_buffer_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
)
add_test(
NAME
diff --git a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt
index 3b6d0fa0462..0169715a20a 100644
--- a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt
+++ b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt
@@ -36,6 +36,7 @@ target_link_options(ydb-library-yql-dq-actors-compute-ut PRIVATE
)
target_sources(ydb-library-yql-dq-actors-compute-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_compute_issues_buffer_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
)
add_test(
NAME
diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
new file mode 100644
index 00000000000..c27347d22a6
--- /dev/null
+++ b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp
@@ -0,0 +1,83 @@
+#include <ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql::NDq {
+
+Y_UNIT_TEST_SUITE(TDqSourceWatermarkTrackerTest) {
+ Y_UNIT_TEST(StartWatermark) {
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), TInstant::Seconds(11), 2);
+
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11));
+ UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10));
+
+ const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11));
+ UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); // Start watermark was returned already, we shouldn't return it 2-nd time
+ }
+
+ Y_UNIT_TEST(WatermarkMovement1) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(12));
+ UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ }
+
+ Y_UNIT_TEST(WatermarkMovement2) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13));
+ UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ }
+
+ Y_UNIT_TEST(WatermarkMovement3) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13));
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13));
+ UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ }
+
+ Y_UNIT_TEST(WatermarkMovement4) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
+ const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14));
+ UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12));
+ }
+
+ Y_UNIT_TEST(WatermarkFarMovement) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
+ tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30));
+ const auto actual = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30));
+ UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(30));
+ }
+
+ Y_UNIT_TEST(WaitExpectedPartitionsCount) {
+ const auto startWatermark = TInstant::Seconds(10);
+
+ TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2);
+
+ tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12));
+ const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30));
+ UNIT_ASSERT_VALUES_EQUAL(actual1.Defined(), false); // Since expectedPartitionsCount is 2, we shouldn't move watermark
+
+ const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30));
+ UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(30));
+ }
+}
+
+}
diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto
index 56183089cf4..3baf18902e8 100644
--- a/ydb/library/yql/dq/actors/protos/dq_events.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_events.proto
@@ -15,17 +15,23 @@ message TCheckpoint {
optional uint64 Generation = 2;
};
+message TWatermark {
+ optional uint64 TimestampUs = 1;
+};
+
/*
Data and control messages will be processed in the following order:
1) Data
- 2) Checkpoint
- 3) Finished
+ 2) Watermark
+ 3) Checkpoint
+ 4) Finished
*/
message TChannelData {
optional uint64 ChannelId = 1;
optional NYql.NDqProto.TData Data = 2;
optional bool Finished = 3;
optional TCheckpoint Checkpoint = 4;
+ optional TWatermark Watermark = 5;
};
message TEvRun {
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index f9993c12090..8fbb410b1ec 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -199,6 +199,7 @@ struct TEvTaskRunFinished
const TDqMemoryQuota::TProfileStats& profileStats = {},
ui64 mkqlMemoryLimit = 0,
THolder<NDqProto::TMiniKqlProgramState>&& programState = nullptr,
+ bool watermarkInjectedToOutputs = false,
bool checkpointRequestedFromTaskRunner = false,
TDuration computeTime = TDuration::Zero())
: RunStatus(runStatus)
@@ -208,6 +209,7 @@ struct TEvTaskRunFinished
, ProfileStats(profileStats)
, MkqlMemoryLimit(mkqlMemoryLimit)
, ProgramState(std::move(programState))
+ , WatermarkInjectedToOutputs(watermarkInjectedToOutputs)
, CheckpointRequestedFromTaskRunner(checkpointRequestedFromTaskRunner)
, ComputeTime(computeTime)
{ }
@@ -220,13 +222,16 @@ struct TEvTaskRunFinished
TDqMemoryQuota::TProfileStats ProfileStats;
ui64 MkqlMemoryLimit = 0;
THolder<NDqProto::TMiniKqlProgramState> ProgramState;
+ bool WatermarkInjectedToOutputs = false;
bool CheckpointRequestedFromTaskRunner = false;
TDuration ComputeTime;
};
struct TEvChannelPopFinished
- : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> {
+ : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED>
+{
TEvChannelPopFinished() = default;
+
TEvChannelPopFinished(ui32 channelId)
: Stats()
, ChannelId(channelId)
@@ -234,11 +239,21 @@ struct TEvChannelPopFinished
, Finished(false)
, Changed(false)
{ }
- TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, TMaybe<NDqProto::TCheckpoint>&& checkpoint, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {}, TDqTaskRunnerStatsView&& stats = {})
+
+ TEvChannelPopFinished(
+ ui32 channelId,
+ TVector<NDqProto::TData>&& data,
+ TMaybe<NDqProto::TWatermark>&& watermark,
+ TMaybe<NDqProto::TCheckpoint>&& checkpoint,
+ bool finished,
+ bool changed,
+ const TTaskRunnerActorSensors& sensors = {},
+ TDqTaskRunnerStatsView&& stats = {})
: Sensors(sensors)
, Stats(std::move(stats))
, ChannelId(channelId)
, Data(std::move(data))
+ , Watermark(std::move(watermark))
, Checkpoint(std::move(checkpoint))
, Finished(finished)
, Changed(changed)
@@ -248,12 +263,24 @@ struct TEvChannelPopFinished
NDq::TDqTaskRunnerStatsView Stats;
const ui32 ChannelId;
+ // The order is Data -> Watermark -> Checkpoint
TVector<NDqProto::TData> Data;
- TMaybe<NDqProto::TCheckpoint> Checkpoint; // checkpoint follows the last data in this->Data (if it is not empty)
+ TMaybe<NDqProto::TWatermark> Watermark;
+ TMaybe<NDqProto::TCheckpoint> Checkpoint;
bool Finished;
bool Changed;
};
+struct TWatermarkRequest {
+ TWatermarkRequest(TVector<ui32>&& channelIds, TInstant watermark)
+ : ChannelIds(std::move(channelIds))
+ , Watermark(watermark) {
+ }
+
+ TVector<ui32> ChannelIds;
+ TInstant Watermark;
+};
+
// Holds info required to inject barriers to outputs
struct TCheckpointRequest {
TCheckpointRequest(TVector<ui32>&& channelIds, TVector<ui32>&& sinkIds, const NDqProto::TCheckpoint& checkpoint)
@@ -272,10 +299,15 @@ struct TEvContinueRun
TEvContinueRun() = default;
- explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly)
+ explicit TEvContinueRun(
+ TMaybe<TWatermarkRequest>&& watermarkRequest,
+ TMaybe<TCheckpointRequest>&& checkpointRequest,
+ bool checkpointOnly
+ )
: ChannelId(0)
, MemLimit(0)
, FreeSpace(0)
+ , WatermarkRequest(std::move(watermarkRequest))
, CheckpointRequest(std::move(checkpointRequest))
, CheckpointOnly(checkpointOnly)
{ }
@@ -299,6 +331,7 @@ struct TEvContinueRun
const THashSet<ui32> InputChannels;
ui64 MemLimit;
ui64 FreeSpace;
+ TMaybe<TWatermarkRequest> WatermarkRequest = Nothing();
TMaybe<TCheckpointRequest> CheckpointRequest = Nothing();
bool CheckpointOnly = false;
};
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index c8ac3afd65d..7ac6dafaa6b 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -20,6 +20,7 @@
#include <util/generic/queue.h>
#define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream);
+#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream);
#define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream);
#define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream);
#define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream);
@@ -144,10 +145,21 @@ private:
NYql::NDq::ERunStatus res = ERunStatus::Finished;
THashMap<ui32, ui64> inputChannelFreeSpace;
THashMap<ui32, ui64> sourcesFreeSpace;
+
+ const bool shouldHandleWatermark = ev->Get()->WatermarkRequest.Defined()
+ && ev->Get()->WatermarkRequest->Watermark > TaskRunner->GetWatermark().WatermarkIn;
+
if (!ev->Get()->CheckpointOnly) {
+ if (shouldHandleWatermark) {
+ const auto watermark = ev->Get()->WatermarkRequest->Watermark;
+ LOG_T("Task runner. Inject watermark " << watermark);
+ TaskRunner->SetWatermarkIn(watermark);
+ }
+
res = TaskRunner->Run();
LOG_T("Resume execution, run status: " << res);
}
+
if (res == ERunStatus::PendingInput) {
for (auto& channelId : inputMap) {
inputChannelFreeSpace[channelId] = TaskRunner->GetInputChannel(channelId)->GetFreeSpace();
@@ -158,25 +170,38 @@ private:
}
}
+ auto watermarkInjectedToOutputs = false;
THolder<NDqProto::TMiniKqlProgramState> mkqlProgramState;
- if ((res == ERunStatus::PendingInput || res == ERunStatus::Finished) && ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) {
- mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>();
- try {
- mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
- NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData();
- data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
- data.SetBlob(TaskRunner->Save());
- // inject barriers
- // todo:(whcrc) barriers are injected even if source state save failed
- for (const auto& channelId : ev->Get()->CheckpointRequest->ChannelIds) {
- TaskRunner->GetOutputChannel(channelId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
+ if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) {
+ if (shouldHandleWatermark) {
+ for (const auto& channelId : ev->Get()->WatermarkRequest->ChannelIds) {
+ NDqProto::TWatermark watermark;
+ watermark.SetTimestampUs(ev->Get()->WatermarkRequest->Watermark.MicroSeconds());
+ TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark));
}
- for (const auto& sinkId : ev->Get()->CheckpointRequest->SinkIds) {
- TaskRunner->GetSink(sinkId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
+
+ watermarkInjectedToOutputs = true;
+ }
+
+ if (ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) {
+ mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>();
+ try {
+ mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0);
+ NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData();
+ data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion);
+ data.SetBlob(TaskRunner->Save());
+ // inject barriers
+ // todo:(whcrc) barriers are injected even if source state save failed
+ for (const auto& channelId : ev->Get()->CheckpointRequest->ChannelIds) {
+ TaskRunner->GetOutputChannel(channelId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
+ }
+ for (const auto& sinkId : ev->Get()->CheckpointRequest->SinkIds) {
+ TaskRunner->GetSink(sinkId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint));
+ }
+ } catch (const std::exception& e) {
+ LOG_E("Failed to save state: " << e.what());
+ mkqlProgramState = nullptr;
}
- } catch (const std::exception& e) {
- LOG_E("Failed to save state: " << e.what());
- mkqlProgramState = nullptr;
}
}
@@ -194,6 +219,7 @@ private:
MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(),
MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0,
std::move(mkqlProgramState),
+ watermarkInjectedToOutputs,
ev->Get()->CheckpointRequest.Defined(),
TInstant::Now() - start),
/*flags=*/0,
@@ -281,22 +307,31 @@ private:
}
TVector<NDqProto::TData> chunks;
+ TMaybe<NDqProto::TWatermark> watermark = Nothing();
TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing();
for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
NDqProto::TData data;
hasData = channel->Pop(data, remain);
+
+ NDqProto::TWatermark poppedWatermark;
+ bool hasWatermark = channel->Pop(poppedWatermark);
+
NDqProto::TCheckpoint poppedCheckpoint;
bool hasCheckpoint = channel->Pop(poppedCheckpoint);
+
dataSize = data.GetRaw().size();
isFinished = !hasData && channel->IsFinished();
- changed = changed || hasData || hasCheckpoint || (isFinished != wasFinished);
+ changed = changed || hasData || hasWatermark || hasCheckpoint || (isFinished != wasFinished);
if (hasData) {
chunks.emplace_back(std::move(data));
}
+
+ watermark = hasWatermark ? std::move(poppedWatermark) : TMaybe<NDqProto::TWatermark>();
+ checkpoint = hasCheckpoint ? std::move(poppedCheckpoint) : TMaybe<NDqProto::TCheckpoint>();
+
if (hasCheckpoint) {
- checkpoint = std::move(poppedCheckpoint);
ResumeInputs();
break;
}
@@ -307,6 +342,7 @@ private:
new TEvChannelPopFinished(
channelId,
std::move(chunks),
+ std::move(watermark),
std::move(checkpoint),
isFinished,
changed,
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 5ac9c3fe850..0e7cfbdea19 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -20,6 +20,14 @@ enum ECheckpointingMode {
CHECKPOINTING_MODE_DISABLED = 1; // Checkpoints are not used in this part of graph (channels, tasks). This is typically when we are deadling with finite part of graph.
}
+// Mode of watermarks support.
+// There can be different watermarks settings
+// in different graph parts.
+enum EWatermarksMode {
+ WATERMARKS_MODE_DEFAULT = 0; // Watermarks are used according to common settings for our type of query.
+ WATERMARKS_MODE_DISABLED = 1; // Watermarks are not used in this part of graph (channels, tasks).
+}
+
message TProgram {
message TSettings {
bool HasMapJoin = 1;
@@ -58,6 +66,7 @@ message TChannel {
bool IsPersistent = 7;
bool InMemory = 8;
ECheckpointingMode CheckpointingMode = 9;
+ EWatermarksMode WatermarksMode = 10;
}
message TUnionAllInput {
@@ -66,6 +75,7 @@ message TUnionAllInput {
message TSourceInput {
string Type = 1;
google.protobuf.Any Settings = 2;
+ EWatermarksMode WatermarksMode = 3;
}
message TSortColumn {
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp
index 38bd1999baa..5c8b38054ed 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.cpp
+++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp
@@ -11,7 +11,7 @@ namespace {
class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
struct TValueDesc {
- std::variant<NUdf::TUnboxedValue, NDqProto::TCheckpoint> Value;
+ std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint> Value;
ui64 EstimatedSize;
TValueDesc(NUdf::TUnboxedValue&& value, ui64 size)
@@ -20,6 +20,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer {
{
}
+ TValueDesc(NDqProto::TWatermark&& watermark, ui64 size)
+ : Value(std::move(watermark))
+ , EstimatedSize(size)
+ {
+ }
+
TValueDesc(NDqProto::TCheckpoint&& checkpoint, ui64 size)
: Value(std::move(checkpoint))
, EstimatedSize(size)
@@ -61,6 +67,14 @@ public:
ReportChunkIn();
}
+ void Push(NDqProto::TWatermark&& watermark) override {
+ const ui64 bytesSize = watermark.ByteSize();
+ Values.emplace_back(std::move(watermark), bytesSize);
+ EstimatedStoredBytes += bytesSize;
+
+ ReportChunkIn();
+ }
+
void Push(NDqProto::TCheckpoint&& checkpoint) override {
const ui64 bytesSize = checkpoint.ByteSize();
Values.emplace_back(std::move(checkpoint), bytesSize);
@@ -105,6 +119,21 @@ public:
return usedBytes;
}
+ bool Pop(NDqProto::TWatermark& watermark) override {
+ if (!Values.empty() && std::holds_alternative<NDqProto::TWatermark>(Values.front().Value)) {
+ watermark = std::move(std::get<NDqProto::TWatermark>(Values.front().Value));
+ const auto size = Values.front().EstimatedSize;
+ Y_VERIFY(EstimatedStoredBytes >= size);
+ EstimatedStoredBytes -= size;
+ Values.pop_front();
+
+ ReportChunkOut(1, size);
+
+ return true;
+ }
+ return false;
+ }
+
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
if (!Values.empty() && std::holds_alternative<NDqProto::TCheckpoint>(Values.front().Value)) {
checkpoint = std::move(std::get<NDqProto::TCheckpoint>(Values.front().Value));
diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h
index 931957a7208..86e57b6e58c 100644
--- a/ydb/library/yql/dq/runtime/dq_async_output.h
+++ b/ydb/library/yql/dq/runtime/dq_async_output.h
@@ -33,6 +33,9 @@ public:
// Pop data to send. Return estimated size of returned data.
[[nodiscard]]
virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) = 0;
+ // Pop watermark
+ [[nodiscard]]
+ virtual bool Pop(NDqProto::TWatermark& watermark) = 0;
// Pop chechpoint. Checkpoints may be taken from sink even after it is finished.
[[nodiscard]]
virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_output.h b/ydb/library/yql/dq/runtime/dq_output.h
index e793ff5231b..c0251410782 100644
--- a/ydb/library/yql/dq/runtime/dq_output.h
+++ b/ydb/library/yql/dq/runtime/dq_output.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/minikql/mkql_node.h>
#include <util/datetime/base.h>
@@ -42,6 +43,7 @@ public:
virtual bool IsFull() const = 0;
// can throw TDqChannelStorageException
virtual void Push(NUdf::TUnboxedValue&& value) = 0;
+ virtual void Push(NDqProto::TWatermark&& watermark) = 0;
// Push checkpoint. Checkpoints may be pushed to channel even after it is finished.
virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0;
virtual void Finish() = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 88d23c8b38e..dbc85682740 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -125,6 +125,11 @@ public:
}
}
+ void Push(NDqProto::TWatermark&& watermark) override {
+ YQL_ENSURE(!Watermark);
+ Watermark.ConstructInPlace(std::move(watermark));
+ }
+
void Push(NDqProto::TCheckpoint&& checkpoint) override {
YQL_ENSURE(!Checkpoint);
Checkpoint.ConstructInPlace(std::move(checkpoint));
@@ -209,6 +214,16 @@ public:
}
[[nodiscard]]
+ bool Pop(NDqProto::TWatermark& watermark) override {
+ if (!HasData() && Watermark) {
+ watermark = std::move(*Watermark);
+ Watermark = Nothing();
+ return true;
+ }
+ return false;
+ }
+
+ [[nodiscard]]
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
if (!HasData() && Checkpoint) {
checkpoint = std::move(*Checkpoint);
@@ -313,6 +328,7 @@ private:
ui32 EstimatedRowBytes = 0;
bool Finished = false;
+ TMaybe<NDqProto::TWatermark> Watermark;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
};
@@ -549,6 +565,11 @@ public:
#endif
}
+ void Push(NDqProto::TWatermark&& watermark) override {
+ YQL_ENSURE(!Watermark);
+ Watermark.ConstructInPlace(std::move(watermark));
+ }
+
void Push(NDqProto::TCheckpoint&& checkpoint) override {
YQL_ENSURE(!Checkpoint);
Checkpoint.ConstructInPlace(std::move(checkpoint));
@@ -673,6 +694,16 @@ public:
}
[[nodiscard]]
+ bool Pop(NDqProto::TWatermark& watermark) override {
+ if (!HasData() && Watermark) {
+ watermark = std::move(*Watermark);
+ Watermark = Nothing();
+ return true;
+ }
+ return false;
+ }
+
+ [[nodiscard]]
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
if (!HasData() && Checkpoint) {
checkpoint = std::move(*Checkpoint);
@@ -830,6 +861,7 @@ private:
bool Finished = false;
+ TMaybe<NDqProto::TWatermark> Watermark;
TMaybe<NDqProto::TCheckpoint> Checkpoint;
};
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h
index 0adee34614e..8c74f3b71b6 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.h
@@ -52,6 +52,9 @@ public:
// can throw TDqChannelStorageException
[[nodiscard]]
virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0;
+ // Pop watermark.
+ [[nodiscard]]
+ virtual bool Pop(NDqProto::TWatermark& watermark) = 0;
// Pop chechpoint. Checkpoints may be taken from channel even after it is finished.
[[nodiscard]]
virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0;
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 0e36cd68d43..540db4aae17 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -684,10 +684,14 @@ public:
return TaskHasEffects;
}
- void SetWatermark(TInstant time) {
+ void SetWatermarkIn(TInstant time) override {
Watermark.WatermarkIn = std::move(time);
}
+ const NKikimr::NMiniKQL::TWatermark& GetWatermark() const override {
+ return Watermark;
+ }
+
IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override {
auto ptr = InputChannels.FindPtr(channelId);
YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index 710f4e5cf5a..9f24d82fc27 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -12,8 +12,9 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h>
#include <ydb/library/yql/minikql/mkql_alloc.h>
#include <ydb/library/yql/minikql/mkql_function_registry.h>
-#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/mkql_node_visitor.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_watermark.h>
#include <library/cpp/monlib/metrics/histogram_collector.h>
@@ -326,6 +327,9 @@ public:
[[nodiscard]]
virtual TString Save() const = 0;
virtual void Load(TStringBuf in) = 0;
+
+ virtual void SetWatermarkIn(TInstant time) = 0;
+ virtual const NKikimr::NMiniKQL::TWatermark& GetWatermark() const = 0;
};
TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings,
diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
index 780f5009036..8cb87b01558 100644
--- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h
+++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h
@@ -74,6 +74,7 @@ struct TChannel {
ui32 DstInputIndex = 0;
bool InMemory = true;
NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
+ NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
};
using TChannelList = TVector<ui64>;
@@ -117,6 +118,7 @@ struct TTaskInput {
TChannelList Channels;
TMaybe<::google::protobuf::Any> SourceSettings;
TString SourceType;
+ NYql::NDqProto::EWatermarksMode WatermarksMode = NYql::NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED;
TInputMeta Meta;
TMaybe<TTransform> Transform;
@@ -167,6 +169,7 @@ struct TTask {
NActors::TActorId ComputeActorId;
TTaskMeta Meta;
NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
+ NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
};
template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta>
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
index 585507da050..47df2509ce7 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp
@@ -44,11 +44,11 @@ public:
ui64 intervalHopCount,
ui64 delayHopCount,
bool dataWatermarks,
+ bool watermarkMode,
TComputationContext& ctx,
const THashFunc& hash,
const TEqualsFunc& equal,
- TWatermark& watermark,
- bool watermarkMode)
+ TWatermark& watermark)
: TBase(memInfo)
, Stream(std::move(stream))
, Self(self)
@@ -60,7 +60,7 @@ public:
, Watermark(watermark)
, WatermarkMode(watermarkMode)
{
- if (dataWatermarks) {
+ if (!watermarkMode && dataWatermarks) {
DataWatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime));
}
}
@@ -238,11 +238,11 @@ public:
auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex);
if (hopIndex < keyState.HopIndex) {
- ++EarlyEventsThrown;
+ ++LateEventsThrown;
continue;
}
if (WatermarkMode && (hopIndex >= keyState.HopIndex + DelayHopCount + IntervalHopCount)) {
- ++LateEventsThrown;
+ ++EarlyEventsThrown;
continue;
}
@@ -429,10 +429,10 @@ public:
IComputationNode* interval,
IComputationNode* delay,
IComputationNode* dataWatermarks,
+ IComputationNode* watermarkMode,
TType* keyType,
TType* stateType,
- TWatermark& watermark,
- bool watermarkMode)
+ TWatermark& watermark)
: TBaseComputation(mutables)
, Stream(stream)
, Item(item)
@@ -454,6 +454,7 @@ public:
, Interval(interval)
, Delay(delay)
, DataWatermarks(dataWatermarks)
+ , WatermarkMode(watermarkMode)
, KeyType(keyType)
, StateType(stateType)
, KeyPacker(mutables)
@@ -462,7 +463,6 @@ public:
, IsTuple(false)
, UseIHash(false)
, Watermark(watermark)
- , WatermarkMode(watermarkMode)
{
Stateless = false;
bool encoded;
@@ -475,6 +475,7 @@ public:
const auto interval = Interval->GetValue(ctx).Get<i64>();
const auto delay = Delay->GetValue(ctx).Get<i64>();
const auto dataWatermarks = DataWatermarks->GetValue(ctx).Get<bool>();
+ const auto watermarkMode = WatermarkMode->GetValue(ctx).Get<bool>();
// TODO: move checks from here
MKQL_ENSURE(hopTime > 0, "hop must be positive");
@@ -489,10 +490,10 @@ public:
return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime,
(ui64)intervalHopCount, (ui64)delayHopCount,
- dataWatermarks, ctx,
+ dataWatermarks, watermarkMode, ctx,
TValueHasher(KeyTypes, IsTuple, UseIHash ? MakeHashImpl(KeyType) : nullptr),
TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr),
- Watermark, WatermarkMode);
+ Watermark);
}
NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override {
@@ -532,6 +533,7 @@ private:
DependsOn(Interval);
DependsOn(Delay);
DependsOn(DataWatermarks);
+ DependsOn(WatermarkMode);
}
IComputationNode* const Stream;
@@ -557,6 +559,7 @@ private:
IComputationNode* const Interval;
IComputationNode* const Delay;
IComputationNode* const DataWatermarks;
+ IComputationNode* const WatermarkMode;
TType* const KeyType;
TType* const StateType;
@@ -567,13 +570,12 @@ private:
bool IsTuple;
bool UseIHash;
TWatermark& Watermark;
- bool WatermarkMode;
};
}
-IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode) {
- MKQL_ENSURE(callable.GetInputsCount() == 20, "Expected 20 args");
+IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark) {
+ MKQL_ENSURE(callable.GetInputsCount() == 21, "Expected 21 args");
auto hasSaveLoad = !callable.GetInput(12).GetStaticType()->IsVoid();
@@ -604,6 +606,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo
auto interval = LocateNode(ctx.NodeLocator, callable, 17);
auto delay = LocateNode(ctx.NodeLocator, callable, 18);
auto dataWatermarks = LocateNode(ctx.NodeLocator, callable, 19);
+ auto watermarkMode = LocateNode(ctx.NodeLocator, callable, 20);
auto item = LocateExternalNode(ctx.NodeLocator, callable, 1);
auto key = LocateExternalNode(ctx.NodeLocator, callable, 2);
@@ -620,7 +623,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo
return new TMultiHoppingCoreWrapper(ctx.Mutables,
stream, item, key, state, state2, time, inSave, inLoad, keyExtract,
outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish,
- hop, interval, delay, dataWatermarks, keyType, stateType, watermark, watermarkMode);
+ hop, interval, delay, dataWatermarks, watermarkMode, keyType, stateType, watermark);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
index 02286aab567..603ea9207ae 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h
@@ -1,16 +1,12 @@
#pragma once
#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/mkql_watermark.h>
namespace NKikimr {
namespace NMiniKQL {
-
-struct TWatermark {
- TInstant WatermarkIn;
-};
-
-IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode = false);
+IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark);
}
}
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
index 8bc4a67c2cf..7959e5e601a 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp
@@ -175,7 +175,8 @@ namespace {
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay
- pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks
+ pgmBuilder.NewDataLiteral<bool>(dataWatermarks), // dataWatermarks
+ pgmBuilder.NewDataLiteral<bool>(false)
);
auto graph = setup.BuildGraph(pgmReturn, {streamNode});
diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
index 27517269fe4..61a1ea44aea 100644
--- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp
@@ -64,12 +64,12 @@ namespace {
return CreateDeterministicTimeProvider(10000000);
}
- TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark, bool watermarkMode = false) {
- return [&watermark, watermarkMode](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) {
+ return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
if (callable.GetType()->GetName() == "MyStream") {
return new TExternalComputationNode(ctx.Mutables);
} else if (callable.GetType()->GetName() == "MultiHoppingCore") {
- return WrapMultiHoppingCore(callable, ctx, watermark, watermarkMode);
+ return WrapMultiHoppingCore(callable, ctx, watermark);
}
return GetBuiltinFactory()(callable, ctx);
@@ -92,7 +92,7 @@ namespace {
THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) {
Explorer.Walk(pgm.GetNode(), *Env);
- TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark, WatermarkMode),
+ TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark),
FunctionRegistry.Get(),
NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi,
StatsResgistry.Get());
@@ -217,7 +217,8 @@ namespace {
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval
pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay
- pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks
+ pgmBuilder.NewDataLiteral<bool>(dataWatermarks),
+ pgmBuilder.NewDataLiteral<bool>(setup.WatermarkMode)
);
auto graph = setup.BuildGraph(pgmReturn, {streamNode});
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 4dfedcc84cd..3fcadf95362 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -4759,7 +4759,8 @@ TRuntimeNode TProgramBuilder::MultiHoppingCore(TRuntimeNode list,
const TUnaryLambda& load,
const TBinaryLambda& merge,
const TTernaryLambda& finish,
- TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, TRuntimeNode dataWatermarks)
+ TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay,
+ TRuntimeNode dataWatermarks, TRuntimeNode watermarksMode)
{
if constexpr (RuntimeVersion < 22U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
@@ -4829,6 +4830,7 @@ TRuntimeNode TProgramBuilder::MultiHoppingCore(TRuntimeNode list,
callableBuilder.Add(interval);
callableBuilder.Add(delay);
callableBuilder.Add(dataWatermarks);
+ callableBuilder.Add(watermarksMode);
return TRuntimeNode(callableBuilder.Build(), false);
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 72112c9d1e5..d514255b2d0 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -441,7 +441,8 @@ public:
const TUnaryLambda& load,
const TBinaryLambda& merge,
const TTernaryLambda& finish,
- TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, TRuntimeNode dataWatermarks);
+ TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay,
+ TRuntimeNode dataWatermarks, TRuntimeNode watermarksMode);
TRuntimeNode Chopper(TRuntimeNode flow, const TUnaryLambda& keyExtractor, const TBinaryLambda& groupSwitch, const TBinaryLambda& groupHandler);
diff --git a/ydb/library/yql/minikql/mkql_watermark.h b/ydb/library/yql/minikql/mkql_watermark.h
new file mode 100644
index 00000000000..8a5805bd536
--- /dev/null
+++ b/ydb/library/yql/minikql/mkql_watermark.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "util/datetime/base.h"
+
+namespace NKikimr {
+namespace NMiniKQL {
+
+struct TWatermark {
+ TInstant WatermarkIn;
+};
+
+}}
diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
index aaebb97f036..94c558d868d 100644
--- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
+++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp
@@ -91,7 +91,7 @@ private:
}
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 /*freeSpace*/) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final {
if (Result) {
const auto size = Result->size();
buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result)));
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index ce81e578174..2025958d96c 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -1648,9 +1648,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time});
};
+ const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool));
+
return ctx.ProgramBuilder.MultiHoppingCore(
stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish,
- hop, interval, delay, dataWatermarks);
+ hop, interval, delay, dataWatermarks, watermarksMode);
});
AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) {
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
index f036dee78f4..3d680abf16e 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp
@@ -91,8 +91,12 @@ TFakeCASetup::TFakeCASetup()
NActors::TMailboxType::Simple,
0));
+ Runtime->SetLogBackend(CreateStderrBackend());
+
TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare();
Runtime->Initialize(app->Unwrap());
+
+ Runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_TRACE);
}
TFakeCASetup::~TFakeCASetup() {
diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
index 12a1af40af1..a4f6c8ea9e5 100644
--- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
+++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h
@@ -182,44 +182,64 @@ struct TFakeCASetup {
~TFakeCASetup();
template<typename T>
- std::vector<T> AsyncInputRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) {
- std::vector<T> result;
- Execute([&result, &parser, freeSpace](TFakeActor& actor) {
+ std::vector<std::variant<T, TInstant>> AsyncInputRead(
+ const TReadValueParser<T> parser,
+ NThreading::TFuture<void>& nextDataFutureOut,
+ i64 freeSpace = 12345)
+ {
+ std::vector<std::variant<T, TInstant>> result;
+ NThreading::TFuture<bool> nextDataFuture;
+ Execute([&result, &parser, freeSpace, &nextDataFutureOut, this](TFakeActor& actor) {
+ TMaybe<TInstant> watermark;
NKikimr::NMiniKQL::TUnboxedValueVector buffer;
bool finished = false;
- actor.DqAsyncInput->GetAsyncInputData(buffer, finished, freeSpace);
+ actor.DqAsyncInput->GetAsyncInputData(buffer, watermark, finished, freeSpace);
for (const auto& uv : buffer) {
for (const auto item : parser(uv)) {
result.emplace_back(item);
}
}
+
+ if (watermark) {
+ result.emplace_back(*watermark);
+ }
+
+ nextDataFutureOut = AsyncInputPromises.NewAsyncInputDataArrived.GetFuture();
});
return result;
}
template<typename T>
- std::vector<T> AsyncInputReadUntil(
+ std::vector<std::variant<T, TInstant>> AsyncInputReadUntil(
const TReadValueParser<T> parser,
ui64 size,
i64 eachReadFreeSpace = 1000,
- TDuration timeout = TDuration::Seconds(10))
+ TDuration timeout = TDuration::Seconds(30))
{
- std::vector<T> result;
+ std::vector<std::variant<T, TInstant>> result;
+ TInstant startedAt = TInstant::Now();
DoWithRetry([&](){
- auto batch = AsyncInputRead<T>(parser, eachReadFreeSpace);
+ NThreading::TFuture<void> nextDataFuture;
+ auto batch = AsyncInputRead<T>(parser, nextDataFuture, eachReadFreeSpace);
for (const auto& item : batch) {
result.emplace_back(item);
}
+ if (TInstant::Now() > startedAt + timeout) {
+ return;
+ }
+
if (result.size() < size) {
- AsyncInputPromises.NewAsyncInputDataArrived.GetFuture().Wait(timeout);
+ nextDataFuture.Wait(timeout);
ythrow yexception() << "Not enough data";
}
},
- TRetryOptions(3),
- false);
+ TRetryOptions(std::numeric_limits<ui32>::max()),
+ true);
+
+ UNIT_ASSERT_EQUAL_C(result.size(), size, "Waited for " << size << " items but only " << result.size() << " received");
return result;
}
diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
index f012a43ed10..d22597522dd 100644
--- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp
@@ -265,7 +265,7 @@ private:
void OnFailure(TEvDqFailure::TPtr& ev, const NActors::TActorContext&) {
if (!Finished) {
YQL_LOG_CTX_ROOT_SCOPE(TraceId);
- YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__
+ YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__
<< ", status=" << static_cast<int>(ev->Get()->Record.GetStatusCode())
<< ", issues size=" << ev->Get()->Record.IssuesSize()
<< ", sender=" << ev->Sender;
diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
index 69416229bc8..1893f63e41c 100644
--- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp
+++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp
@@ -268,6 +268,7 @@ private:
.InputDesc = input,
.InputIndex = static_cast<ui64>(inputId),
.TxId = TraceId,
+ .TaskId = Task.GetId(),
.SecureParams = secureParams,
.TaskParams = taskParams,
.ComputeActorId = SelfId(),
@@ -298,6 +299,7 @@ private:
.OutputDesc = output,
.OutputIndex = static_cast<ui64>(outputId),
.TxId = TraceId,
+ .TaskId = Task.GetId(),
.Callback = this,
.SecureParams = secureParams,
.TaskParams = taskParams,
@@ -556,9 +558,10 @@ private:
continue;
}
auto guard = source.TypeEnv->BindAllocator();
+ TMaybe<TInstant> watermark;
NKikimr::NMiniKQL::TUnboxedValueVector batch;
bool finished = false;
- const i64 space = source.Source->GetAsyncInputData(batch, finished, freeSpace);
+ const i64 space = source.Source->GetAsyncInputData(batch, watermark, finished, freeSpace);
const ui64 index = inputIndex;
if (space <= 0) {
continue;
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
index 25582f97af6..110b78eb312 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp
@@ -48,6 +48,8 @@ TDqConfiguration::TDqConfiguration() {
REGISTER_SETTING(*this, WorkerFilter);
REGISTER_SETTING(*this, _EnablePrecompute);
REGISTER_SETTING(*this, EnableDqReplicate);
+ REGISTER_SETTING(*this, WatermarksMode);
+ REGISTER_SETTING(*this, WatermarksGranularityMs);
}
} // namespace NYql
diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
index cf12ecf9115..dd4b1b35077 100644
--- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h
+++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h
@@ -72,7 +72,8 @@ struct TDqSettings {
NCommon::TConfSetting<bool, false> _EnablePrecompute;
NCommon::TConfSetting<bool, false> EnableDqReplicate;
NCommon::TConfSetting<bool, false> _EnableGraceJoin;
-
+ NCommon::TConfSetting<TString, false> WatermarksMode;
+ NCommon::TConfSetting<ui64, false> WatermarksGranularityMs;
NCommon::TConfSetting<TString, false> WorkerFilter;
@@ -112,6 +113,8 @@ struct TDqSettings {
SAVE_SETTING(_FallbackOnRuntimeErrors);
SAVE_SETTING(WorkerFilter);
SAVE_SETTING(ComputeActorType);
+ SAVE_SETTING(WatermarksMode);
+ SAVE_SETTING(WatermarksGranularityMs);
#undef SAVE_SETTING
}
diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
index a29b31cb668..c60a2290c5d 100644
--- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
+++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp
@@ -171,6 +171,10 @@ private:
const auto loadLambda = BuildLoadHopLambda(aggregate, ctx);
const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx);
const auto finishLambda = BuildFinishHopLambda(aggregate, ctx);
+ const auto watermarkMode = BuildWatermarkMode(aggregate, ctx);
+ if (!watermarkMode) {
+ return nullptr;
+ }
const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done();
auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos)
@@ -185,7 +189,8 @@ private:
.MergeHandler(mergeLambda)
.FinishHandler(finishLambda)
.SaveHandler(saveLambda)
- .LoadHandler(loadLambda);
+ .LoadHandler(loadLambda)
+ .WatermarkMode(*watermarkMode);
if (Config->AnalyticsHopping.Get().GetOrElse(false)) {
return Build<TCoPartitionsByKeys>(ctx, node.Pos())
@@ -781,6 +786,17 @@ private:
.Ptr();
}
+ TMaybe<TExprNode::TPtr> BuildWatermarkMode(const TCoAggregate& aggregate, TExprContext& ctx) {
+ const auto analyticsMode = Config->AnalyticsHopping.Get().GetOrElse(false);
+ const bool enableWatermarks = !analyticsMode && Config->WatermarksMode.Get() == "default";
+ if (enableWatermarks && Config->ComputeActorType.Get() != "async") {
+ ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor"));
+ return Nothing();
+ }
+
+ return ctx.NewAtom(aggregate.Pos(), ToString(enableWatermarks));
+ }
+
private:
TDqConfiguration::TPtr Config;
TTypeAnnotationContext& TypesCtx;
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index 06c81121921..bc195decbc5 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -259,7 +259,7 @@ namespace NYql::NDqs {
SourceTaskID = resultTask.Id;
}
- BuildCheckpointingMode();
+ BuildCheckpointingAndWatermarksMode(true, settings->WatermarksMode.Get().GetOrElse("") == "default");
return TasksGraph.GetTasks().size();
}
@@ -279,7 +279,7 @@ namespace NYql::NDqs {
return sourceType == "PqSource"; // Now it is the only infinite source type. Others are finite.
}
- void TDqsExecutionPlanner::BuildCheckpointingMode() {
+ void TDqsExecutionPlanner::BuildCheckpointingAndWatermarksMode(bool enableCheckpoints, bool enableWatermarks) {
std::stack<TDqsTasksGraph::TTaskType*> tasksStack;
std::vector<bool> processedTasks(TasksGraph.GetTasks().size());
for (TDqsTasksGraph::TTaskType& task : TasksGraph.GetTasks()) {
@@ -312,33 +312,62 @@ namespace NYql::NDqs {
continue;
}
- // Current task has all inputs processed, so determine its checkpointing mode now.
+ // Current task has all inputs processed, so determine its checkpointing and watermarks mode now.
NDqProto::ECheckpointingMode checkpointingMode = NDqProto::CHECKPOINTING_MODE_DISABLED;
- for (const auto& input : task.Inputs) {
- if (input.SourceType) {
- if (IsInfiniteSourceType(input.SourceType)) {
- checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
- break;
- }
- } else {
- for (ui64 channelId : input.Channels) {
- const NDq::TChannel& channel = TasksGraph.GetChannel(channelId);
- if (channel.CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED) {
+ if (enableCheckpoints) {
+ for (const auto& input : task.Inputs) {
+ if (input.SourceType) {
+ if (IsInfiniteSourceType(input.SourceType)) {
checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
break;
}
+ } else {
+ for (ui64 channelId : input.Channels) {
+ const NDq::TChannel& channel = TasksGraph.GetChannel(channelId);
+ if (channel.CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED) {
+ checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT;
+ break;
+ }
+ }
+ if (checkpointingMode == NDqProto::CHECKPOINTING_MODE_DEFAULT) {
+ break;
+ }
}
- if (checkpointingMode == NDqProto::CHECKPOINTING_MODE_DEFAULT) {
- break;
+ }
+ }
+
+ NDqProto::EWatermarksMode watermarksMode = NDqProto::WATERMARKS_MODE_DISABLED;
+ if (enableWatermarks) {
+ for (auto& input : task.Inputs) {
+ if (input.SourceType) {
+ if (IsInfiniteSourceType(input.SourceType)) {
+ watermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT;
+ input.WatermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT;
+ break;
+ }
+ } else {
+ for (ui64 channelId : input.Channels) {
+ const NDq::TChannel& channel = TasksGraph.GetChannel(channelId);
+ if (channel.WatermarksMode != NDqProto::WATERMARKS_MODE_DEFAULT) {
+ watermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT;
+ break;
+ }
+ }
+ if (watermarksMode == NDqProto::WATERMARKS_MODE_DEFAULT) {
+ break;
+ }
}
}
}
// Apply mode to task and its outputs.
task.CheckpointingMode = checkpointingMode;
+ task.WatermarksMode = watermarksMode;
for (const auto& output : task.Outputs) {
for (ui64 channelId : output.Channels) {
- TasksGraph.GetChannel(channelId).CheckpointingMode = checkpointingMode;
+ auto& channel = TasksGraph.GetChannel(channelId);
+ channel.CheckpointingMode = checkpointingMode;
+ channel.WatermarksMode = watermarksMode;
}
}
@@ -398,6 +427,7 @@ namespace NYql::NDqs {
auto* sourceProto = inputDesc.MutableSource();
*sourceProto->MutableSettings() = *input.SourceSettings;
sourceProto->SetType(input.SourceType);
+ sourceProto->SetWatermarksMode(input.WatermarksMode);
} else {
FillInputDesc(inputDesc, input);
}
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h
index d647665c756..4049cff1a57 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.h
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.h
@@ -72,7 +72,7 @@ namespace NYql::NDqs {
void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output);
void GatherPhyMapping(THashMap<std::tuple<TString, TString>, TString>& clusters, THashMap<std::tuple<TString, TString, TString>, TString>& tables);
- void BuildCheckpointingMode();
+ void BuildCheckpointingAndWatermarksMode(bool enableCheckpoints, bool enableWatermarks);
bool IsEgressTask(const TDqsTasksGraph::TTaskType& task) const;
private:
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index 23a9eca9ff4..56c64e0fe07 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -913,6 +913,11 @@ public:
ythrow yexception() << "unimplemented";
}
+ void Push(NDqProto::TWatermark&& watermark) override {
+ Y_UNUSED(watermark);
+ ythrow yexception() << "unimplemented";
+ }
+
void Push(NDqProto::TCheckpoint&& checkpoint) override {
Y_UNUSED(checkpoint);
ythrow yexception() << "unimplemented";
@@ -956,6 +961,10 @@ public:
}
}
+ bool Pop(NDqProto::TWatermark&) override {
+ return false;
+ }
+
bool Pop(NDqProto::TCheckpoint&) override {
return false;
}
@@ -1179,6 +1188,11 @@ public:
Y_FAIL("Unimplemented");
}
+ bool Pop(NDqProto::TWatermark& watermark) override {
+ Y_UNUSED(watermark);
+ Y_FAIL("Watermarks are not supported");
+ }
+
bool Pop(NDqProto::TCheckpoint& checkpoint) override {
Y_UNUSED(checkpoint);
Y_FAIL("Checkpoints are not supported");
@@ -1193,6 +1207,11 @@ public:
Y_FAIL("Unimplemented");
}
+ void Push(NDqProto::TWatermark&& watermark) override {
+ Y_UNUSED(watermark);
+ Y_FAIL("Watermarks are not supported");
+ }
+
void Push(NDqProto::TCheckpoint&& checkpoint) override {
Y_UNUSED(checkpoint);
Y_FAIL("Checkpoints are not supported");
@@ -1598,6 +1617,14 @@ public:
ythrow yexception() << "unimplemented";
}
+ void SetWatermarkIn(TInstant) override {
+ ythrow yexception() << "unimplemented";
+ }
+
+ const NKikimr::NMiniKQL::TWatermark& GetWatermark() const override {
+ ythrow yexception() << "unimplemented";
+ }
+
private:
TIntrusivePtr<TTaskRunner> Delegate;
NDqProto::TDqTask Task;
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index 96532f7ba84..b5ef754dd18 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -345,6 +345,7 @@ private:
channelId,
std::move(chunks),
Nothing(),
+ Nothing(),
isFinished,
changed,
GetSensors(response),
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
index 2b4c8dcb5a8..c7507a91e01 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp
@@ -15,33 +15,55 @@ namespace {
const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = {
{
"_yql_sys_create_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
- return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetCreateTime().MicroSeconds()));
+ using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
+ return std::make_pair(
+ NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetCreateTime().MicroSeconds())),
+ NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
+ );
}
},
{
"_yql_sys_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
- return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetWriteTime().MicroSeconds()));
+ using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>;
+ return std::make_pair(
+ NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())),
+ NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
+ );
}
},
{
"_yql_sys_partition_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
- return NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId());
+ using TDataType = NYql::NUdf::TDataType<ui64>;
+ return std::make_pair(
+ NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId()),
+ NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
+ );
}
},
{
"_yql_sys_offset", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
- return NYql::NUdf::TUnboxedValuePod(message.GetOffset());
+ using TDataType = NYql::NUdf::TDataType<ui64>;
+ return std::make_pair(
+ NYql::NUdf::TUnboxedValuePod(message.GetOffset()),
+ NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize);
}
},
{
"_yql_sys_message_group_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
const auto& data = message.GetMessageGroupId();
- return NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size()));
+ return std::make_pair(
+ NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size())),
+ data.Size()
+ );
}
},
{
"_yql_sys_seq_no", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){
- return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetSeqNo()));
+ using TDataType = NYql::NUdf::TDataType<ui64>;
+ return std::make_pair(
+ NYql::NUdf::TUnboxedValuePod(message.GetSeqNo()),
+ NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize
+ );
}
},
};
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h
index df932bc48f2..d9207757630 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h
@@ -12,7 +12,7 @@
namespace NYql::NDq {
struct TPqMetaExtractor {
- using TPqMetaExtractorLambda = std::function<NYql::NUdf::TUnboxedValuePod(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>;
+ using TPqMetaExtractorLambda = std::function<std::pair<NYql::NUdf::TUnboxedValuePod, i64>(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>;
public:
TPqMetaExtractor();
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
index 2c2605d3264..0b25a022da6 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h>
#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/common/dq_common.h>
#include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h>
@@ -94,6 +95,7 @@ public:
TDqPqReadActor(
ui64 inputIndex,
const TTxId& txId,
+ ui64 taskId,
const THolderFactory& holderFactory,
NPq::NProto::TDqPqTopicSource&& sourceParams,
NPq::NProto::TDqReadTaskParams&& readParams,
@@ -108,7 +110,7 @@ public:
, BufferSize(bufferSize)
, RangesMode(rangesMode)
, HolderFactory(holderFactory)
- , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ source. ")
+ , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << taskId << ". PQ source. ")
, Driver(std::move(driver))
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
, SourceParams(std::move(sourceParams))
@@ -121,6 +123,8 @@ public:
for (const auto& fieldName : SourceParams.GetMetadataFields()) {
MetadataFields.emplace_back(fieldName, fieldsExtractor.FindExtractorLambda(fieldName));
}
+
+ InitWatermarkTracker();
}
NYdb::NPersQueue::TPersQueueClientSettings GetPersQueueClientSettings() const {
@@ -190,6 +194,8 @@ public:
}
}
StartingMessageTimestamp = minStartingMessageTs;
+ InitWatermarkTracker();
+
if (ReadSession) {
ReadSession.reset();
GetReadSession();
@@ -228,12 +234,16 @@ private:
)
void Handle(TEvPrivate::TEvSourceDataReady::TPtr&, const TActorContext& ctx) {
+ SRC_LOG_T("Source data ready");
SubscribedOnEvent = false;
ctx.Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
// IActor & IDqComputeActorAsyncInput
void PassAway() override { // Is called from Compute Actor
+ std::queue<TReadyBatch> empty;
+ ReadyBuffer.swap(empty);
+
if (ReadSession) {
ReadSession->Close(TDuration::Zero());
ReadSession.reset();
@@ -242,38 +252,71 @@ private:
TActor<TDqPqReadActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override {
+ SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace);
+
+ i64 usedSpace = 0;
+ if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
+ return usedSpace;
+ }
+
auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0)));
- ui32 batchSize = 0;
+ ui32 batchItemsEstimatedCount = 0;
for (auto& event : events) {
if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) {
- batchSize += val->GetMessages().size();
+ batchItemsEstimatedCount += val->GetMessages().size();
}
}
- buffer.clear();
- buffer.reserve(batchSize);
- i64 usedSpace = 0;
for (auto& event : events) {
- std::visit(TPQEventProcessor{*this, buffer, usedSpace, LogPrefix}, event);
+ std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event);
}
- SubscribeOnNextEvent();
+ if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) {
+ return usedSpace;
+ }
- return usedSpace;
+ watermark = Nothing();
+ buffer.clear();
+ return 0;
}
private:
- NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const {
- NYdb::NPersQueue::TTopicReadSettings topicReadSettings;
- topicReadSettings.Path(SourceParams.GetTopicPath());
+ std::vector<ui64> GetPartitionsToRead() const {
+ std::vector<ui64> res;
+
ui64 currentPartition = ReadParams.GetPartitioningParams().GetEachTopicPartitionGroupId();
do {
- topicReadSettings.AppendPartitionGroupIds(currentPartition + 1); // 1-based.
+ res.emplace_back(currentPartition + 1); // 1-based.
currentPartition += ReadParams.GetPartitioningParams().GetDqPartitionsCount();
} while (currentPartition < ReadParams.GetPartitioningParams().GetTopicPartitionsCount());
+ return res;
+ }
+
+ void InitWatermarkTracker() {
+ SRC_LOG_D("Watermarks enabled: " << SourceParams.GetWatermarks().GetEnabled() << " granularity: "
+ << SourceParams.GetWatermarks().GetGranularityUs() << " microseconds");
+
+ if (!SourceParams.GetWatermarks().GetEnabled()) {
+ return;
+ }
+
+ WatermarkTracker = std::make_unique<TDqSourceWatermarkTracker<TPartitionKey>>(
+ TDuration::MicroSeconds(SourceParams.GetWatermarks().GetGranularityUs()),
+ StartingMessageTimestamp,
+ GetPartitionsToRead().size() // TODO: for the internal LB there is a problem here. See YQ-1384
+ );
+ }
+
+ NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const {
+ NYdb::NPersQueue::TTopicReadSettings topicReadSettings;
+ topicReadSettings.Path(SourceParams.GetTopicPath());
+ for (const auto partitionId : GetPartitionsToRead()) {
+ topicReadSettings.AppendPartitionGroupIds(partitionId);
+ }
+
return NYdb::NPersQueue::TReadSessionSettings()
.DisableClusterDiscovery(SourceParams.GetClusterType() == NPq::NProto::DataStreams)
.AppendTopics(topicReadSettings)
@@ -283,18 +326,6 @@ private:
.RangesMode(RangesMode);
}
- void UpdateStateWithNewReadData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) {
- if (event.GetMessages().empty()) {
- return;
- }
-
- assert(MaxElementBy(event.GetMessages(), [](const auto& message){ return message.GetOffset(); })
- ->GetOffset() == event.GetMessages().back().GetOffset());
-
- const auto maxOffset = event.GetMessages().back().GetOffset();
- PartitionToOffset[MakePartitionKey(event.GetPartitionStream())] = maxOffset + 1; // Next offset to read from.
- }
-
static TPartitionKey MakePartitionKey(const NYdb::NPersQueue::TPartitionStream::TPtr& partitionStreamPtr) {
return std::make_pair(partitionStreamPtr->GetCluster(), partitionStreamPtr->GetPartitionId());
}
@@ -309,8 +340,55 @@ private:
}
}
+ struct TReadyBatch {
+ public:
+ TReadyBatch(TMaybe<TInstant> watermark, ui32 dataCapacity) : Watermark(watermark){
+ Data.reserve(dataCapacity);
+ }
+
+ public:
+ TMaybe<TInstant> Watermark;
+ TUnboxedValueVector Data;
+ i64 UsedSpace = 0;
+ THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end)
+ };
+
+ bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) {
+ if (ReadyBuffer.empty()) {
+ SubscribeOnNextEvent();
+ return false;
+ }
+
+ auto& readyBatch = ReadyBuffer.front();
+ buffer.swap(readyBatch.Data);
+ watermark = readyBatch.Watermark;
+ usedSpace = readyBatch.UsedSpace;
+
+ for (const auto& [partitionStream, ranges] : readyBatch.OffsetRanges) {
+ for (const auto& [start, end] : ranges) {
+ CurrentDeferredCommit.Add(partitionStream, start, end);
+ }
+ PartitionToOffset[MakePartitionKey(partitionStream)] = ranges.back().second;
+ }
+
+ ReadyBuffer.pop();
+
+ if (ReadyBuffer.empty()) {
+ SubscribeOnNextEvent();
+ } else {
+ Send(SelfId(), new TEvPrivate::TEvSourceDataReady());
+ }
+
+ SRC_LOG_T("Return ready batch."
+ << " DataCount = " << buffer.size()
+ << " Watermark = " << (watermark ? ToString(*watermark) : "none")
+ << " Used space = " << usedSpace);
+ return true;
+ }
+
struct TPQEventProcessor {
void operator()(NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) {
+ const auto partitionKey = MakePartitionKey(event.GetPartitionStream());
for (const auto& message : event.GetMessages()) {
const TString& data = message.GetData();
@@ -322,24 +400,19 @@ private:
continue;
}
- NUdf::TUnboxedValuePod item;
- if (Self.MetadataFields.empty()) {
- item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()));
- } else {
- NUdf::TUnboxedValue* itemPtr;
- item = Self.HolderFactory.CreateDirectArrayHolder(Self.MetadataFields.size() + 1, itemPtr);
- *(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()));
+ auto [item, size] = CreateItem(message);
- for (const auto& [name, extractor] : Self.MetadataFields) {
- *(itemPtr++) = extractor(message);
- }
- }
+ auto& curBatch = GetActiveBatch(partitionKey, message.GetWriteTime());
+ curBatch.Data.emplace_back(std::move(item));
+ curBatch.UsedSpace += size;
- Batch.emplace_back(item);
- UsedSpace += data.Size();
+ auto& offsets = curBatch.OffsetRanges[message.GetPartitionStream()];
+ if (!offsets.empty() && offsets.back().second == message.GetOffset()) {
+ offsets.back().second = message.GetOffset() + 1;
+ } else {
+ offsets.emplace_back(message.GetOffset(), message.GetOffset() + 1);
+ }
}
- Self.UpdateStateWithNewReadData(event);
- Self.CurrentDeferredCommit.Add(event);
}
void operator()(NYdb::NPersQueue::TSessionClosedEvent& ev) {
@@ -366,9 +439,56 @@ private:
void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent&) { }
+ TReadyBatch& GetActiveBatch(const TPartitionKey& partitionKey, TInstant time) {
+ if (Y_UNLIKELY(Self.ReadyBuffer.empty())) {
+ Self.ReadyBuffer.emplace(Nothing(), BatchCapacity);
+ }
+
+ TReadyBatch& activeBatch = Self.ReadyBuffer.back();
+
+ if (!Self.WatermarkTracker) {
+ // Watermark tracker disabled => there is no way more than one batch will be used
+ return activeBatch;
+ }
+
+ const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime(partitionKey, time);
+ if (!maybeNewWatermark) {
+ // Watermark wasn't moved => use current active batch
+ return activeBatch;
+ }
+
+ SRC_LOG_D("New watermark " << *maybeNewWatermark << " was generated");
+ activeBatch.Watermark = maybeNewWatermark; // Write watermark to current batch
+
+ return Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); // And open new batch
+ }
+
+ std::pair<NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message) {
+ const TString& data = message.GetData();
+
+ i64 usedSpace = 0;
+ NUdf::TUnboxedValuePod item;
+ if (Self.MetadataFields.empty()) {
+ item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()));
+ usedSpace += data.Size();
+ } else {
+ NUdf::TUnboxedValue* itemPtr;
+ item = Self.HolderFactory.CreateDirectArrayHolder(Self.MetadataFields.size() + 1, itemPtr);
+ *(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size()));
+ usedSpace += data.Size();
+
+ for (const auto& [name, extractor] : Self.MetadataFields) {
+ auto [ub, size] = extractor(message);
+ *(itemPtr++) = std::move(ub);
+ usedSpace += size;
+ }
+ }
+
+ return std::make_pair(item, usedSpace);
+ }
+
TDqPqReadActor& Self;
- TUnboxedValueVector& Batch;
- i64& UsedSpace;
+ ui32 BatchCapacity;
const TString& LogPrefix;
};
@@ -393,12 +513,15 @@ private:
NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit;
bool SubscribedOnEvent = false;
std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields;
+ std::queue<TReadyBatch> ReadyBuffer;
+ std::unique_ptr<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
};
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
TTxId txId,
+ ui64 taskId,
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
NYdb::TDriver driver,
@@ -422,6 +545,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
TDqPqReadActor* actor = new TDqPqReadActor(
inputIndex,
txId,
+ taskId,
holderFactory,
std::move(settings),
std::move(readTaskParamsMsg),
@@ -446,6 +570,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
std::move(settings),
args.InputIndex,
args.TxId,
+ args.TaskId,
args.SecureParams,
args.TaskParams,
driver,
diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
index bc3e5292fae..d8411fb04a5 100644
--- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
+++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h
@@ -26,6 +26,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
TTxId txId,
+ ui64 taskId,
const THashMap<TString, TString>& secureParams,
const THashMap<TString, TString>& taskParams,
NYdb::TDriver driver,
diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h
index e4ac47c5cd8..184a5d5eb0e 100644
--- a/ydb/library/yql/providers/pq/common/yql_names.h
+++ b/ydb/library/yql/providers/pq/common/yql_names.h
@@ -9,5 +9,6 @@ constexpr TStringBuf ConsumerSetting = "Consumer";
constexpr TStringBuf EndpointSetting = "Endpoint";
constexpr TStringBuf UseSslSetting = "UseSsl";
constexpr TStringBuf AddBearerToTokenSetting = "AddBearerToToken";
+constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs";
} // namespace NYql
diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto
index c5c3ebe1d0f..5ade3091b42 100644
--- a/ydb/library/yql/providers/pq/proto/dq_io.proto
+++ b/ydb/library/yql/providers/pq/proto/dq_io.proto
@@ -14,6 +14,11 @@ enum EClusterType {
DataStreams = 2;
}
+message TWatermarks {
+ bool Enabled = 1;
+ uint64 GranularityUs = 2;
+}
+
message TDqPqTopicSource {
string TopicPath = 1;
string ConsumerName = 2;
@@ -25,6 +30,7 @@ message TDqPqTopicSource {
bool AddBearerToToken = 8;
string DatabaseId = 9;
repeated string MetadataFields = 10;
+ TWatermarks Watermarks = 11;
}
message TDqPqTopicSink {
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
index e4620cb2a8e..5285e586d09 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp
@@ -69,7 +69,7 @@ public:
return 0;
}
- TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override {
+ TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override {
if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) {
const auto& pqReadTopic = maybePqReadTopic.Cast();
@@ -126,7 +126,7 @@ public:
.Input<TDqPqTopicSource>()
.Topic(pqReadTopic.Topic())
.Columns(std::move(columns))
- .Settings(BuildTopicReadSettings(clusterName, read->Pos(), ctx))
+ .Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), ctx))
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
@@ -200,6 +200,9 @@ public:
srcDesc.SetUseSsl(FromString<bool>(Value(setting)));
} else if (name == AddBearerToTokenSetting) {
srcDesc.SetAddBearerToToken(FromString<bool>(Value(setting)));
+ } else if (name == WatermarksGranularityUsSetting) {
+ srcDesc.MutableWatermarks()->SetEnabled(true);
+ srcDesc.MutableWatermarks()->SetGranularityUs(FromString<ui64>(Value(setting)));
}
}
@@ -259,7 +262,12 @@ public:
}
}
- NNodes::TCoNameValueTupleList BuildTopicReadSettings(const TString& cluster, TPositionHandle pos, TExprContext& ctx) const {
+ NNodes::TCoNameValueTupleList BuildTopicReadSettings(
+ const TString& cluster,
+ const TDqSettings& dqSettings,
+ TPositionHandle pos,
+ TExprContext& ctx) const
+ {
TVector<TCoNameValueTuple> props;
{
@@ -283,6 +291,11 @@ public:
Add(props, AddBearerToTokenSetting, "1", pos, ctx);
}
+ if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") {
+ const auto granularity = TDuration::MilliSeconds(dqSettings.WatermarksGranularityMs.Get().GetOrElse(1000));
+ Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx);
+ }
+
return Build<TCoNameValueTupleList>(ctx, pos)
.Add(props)
.Done();
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index a9ef106a6d6..51c63e42b60 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -227,7 +227,7 @@ private:
}
}
- i64 GetAsyncInputData(TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
buffer.reserve(buffer.size() + Blocks.size());
@@ -680,7 +680,7 @@ private:
void CommitState(const NDqProto::TCheckpoint&) final {}
ui64 GetInputIndex() const final { return InputIndex; }
- i64 GetAsyncInputData(TUnboxedValueVector& output, bool& finished, i64 free) final {
+ i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final {
i64 total = 0LL;
if (!Blocks.empty()) do {
auto& block = std::get<NDB::Block>(Blocks.front());
diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
index ae7e78e0c7e..172483b1189 100644
--- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
+++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp
@@ -125,7 +125,7 @@ private:
TActorBootstrapped<TYdbReadActor>::PassAway();
}
- i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final {
+ i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final {
i64 total = 0LL;
if (!Blocks.empty()) {
buffer.reserve(buffer.size() + Blocks.size());