diff options
author | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-09-15 19:30:35 +0300 |
---|---|---|
committer | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-09-15 19:30:35 +0300 |
commit | 85c93b9ca9dd965a0795dbfd8cb29c4ffc3ec8b9 (patch) | |
tree | f27aefdee03a03c7a3721e7a6b8e9047c5b7e350 | |
parent | e13e0cbfa495fea6d77c2293d096b440feef81dd (diff) | |
download | ydb-85c93b9ca9dd965a0795dbfd8cb29c4ffc3ec8b9.tar.gz |
watermarks implementation
60 files changed, 1426 insertions, 192 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 9b75341d47e..291a547b873 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -152,7 +152,7 @@ private: TActorBootstrapped<TKqpStreamLookupActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, TMaybe<TInstant>&, bool& finished, i64) final { i64 totalDataSize = 0; if (TableScheme) { diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 0546662c690..366a0dd290b 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -1328,6 +1328,8 @@ private: apply("EnableComputeActor", "1"); apply("ComputeActorType", "async"); apply("_EnablePrecompute", "1"); + apply("WatermarksMode", "disable"); + apply("WatermarksGranularityMs", "1000"); switch (Params.QueryType) { case YandexQuery::QueryContent::STREAMING: { diff --git a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json index 342a1f2e704..dc77e6f1275 100644 --- a/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json +++ b/ydb/library/yql/core/expr_nodes/yql_expr_nodes.json @@ -1690,7 +1690,8 @@ {"Index": 9, "Name": "SaveHandler", "Type": "TExprBase"}, {"Index": 10, "Name": "LoadHandler", "Type": "TExprBase"}, {"Index": 11, "Name": "MergeHandler", "Type": "TCoLambda"}, - {"Index": 12, "Name": "FinishHandler", "Type": "TCoLambda"} + {"Index": 12, "Name": "FinishHandler", "Type": "TCoLambda"}, + {"Index": 13, "Name": "WatermarkMode", "Type": "TExprBase"} ] }, { diff --git a/ydb/library/yql/core/type_ann/type_ann_list.cpp b/ydb/library/yql/core/type_ann/type_ann_list.cpp index 17c32e5eb35..e8a69409684 100644 --- a/ydb/library/yql/core/type_ann/type_ann_list.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_list.cpp @@ -5003,7 +5003,7 @@ namespace { TStringBuilder() << "Unsupported agg name: " << name)); return IGraphTransformer::TStatus::Error; } - + return IGraphTransformer::TStatus::Ok; } @@ -6053,7 +6053,7 @@ namespace { IGraphTransformer::TStatus MultiHoppingCoreWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { Y_UNUSED(output); - if (!EnsureArgsCount(*input, 13, ctx.Expr)) { + if (!EnsureArgsCount(*input, 14, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } auto& item = input->ChildRef(0); diff --git a/ydb/library/yql/dq/actors/compute/CMakeLists.txt b/ydb/library/yql/dq/actors/compute/CMakeLists.txt index 02fbceab3a9..c1d52365482 100644 --- a/ydb/library/yql/dq/actors/compute/CMakeLists.txt +++ b/ydb/library/yql/dq/actors/compute/CMakeLists.txt @@ -27,12 +27,14 @@ target_link_libraries(dq-actors-compute PUBLIC yql-public-issue ) target_sources(dq-actors-compute PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp - ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/dq_compute_issues_buffer.cpp ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/retry_queue.cpp ) diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index e90e3ec0107..7eb7ec80072 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -259,7 +259,7 @@ private: return true; // handled channels syncronously } CA_LOG_D("DoHandleChannelsAfterFinishImpl"); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(std::move(req), /* checkpointOnly = */ true)); + AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ true)); return false; } @@ -289,15 +289,40 @@ private: auto finished = channelData.GetFinished(); + TMaybe<TInstant> watermark; + if (channelData.HasWatermark()) { + watermark = TInstant::MicroSeconds(channelData.GetWatermark().GetTimestampUs()); + + const bool channelWatermarkChanged = WatermarksTracker.NotifyInChannelWatermarkReceived( + inputChannel->ChannelId, + *watermark + ); + + if (channelWatermarkChanged) { + CA_LOG_T("Pause input channel " << channelData.GetChannelId() << " bacause of watermark " << *watermark); + inputChannel->Pause(*watermark); + } + + WatermarkTakeInputChannelDataRequests[*watermark]++; + } + + DqComputeActorMetrics.ReportInputChannelWatermark( + channelData.GetChannelId(), + channelData.GetData().GetRows(), + watermark); + auto ev = (channelData.GetData().GetRows()) ? MakeHolder<NTaskRunnerActor::TEvPush>( channelData.GetChannelId(), std::move(*channelData.MutableData()), finished, - /*askFreeSpace = */ true, + /* askFreeSpace = */ true, /* pauseAfterPush = */ channelData.HasCheckpoint()) : MakeHolder<NTaskRunnerActor::TEvPush>( - channelData.GetChannelId(), finished, /*askFreeSpace = */ true, /* pauseAfterPush = */ channelData.HasCheckpoint()); + channelData.GetChannelId(), + finished, + /* askFreeSpace = */ true, + /* pauseAfterPush = */ channelData.HasCheckpoint()); Send(TaskRunnerActorId, ev.Release(), 0, Cookie); @@ -309,7 +334,7 @@ private: Checkpoints->RegisterCheckpoint(checkpoint, channelData.GetChannelId()); } - TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId()}; + TakeInputChannelDataRequests[Cookie++] = TTakeInputChannelData{ack, channelData.GetChannelId(), watermark}; } void PassAway() override { @@ -327,6 +352,31 @@ private: TBase::PassAway(); } + TMaybe<NTaskRunnerActor::TWatermarkRequest> GetWatermarkRequest() { + if (!WatermarksTracker.HasPendingWatermark()) { + return Nothing(); + } + + const auto pendingWatermark = *WatermarksTracker.GetPendingWatermark(); + if (WatermarkTakeInputChannelDataRequests.contains(pendingWatermark)) { + // Not all precending to watermark input channels data has been injected + return Nothing(); + } + + TVector<ui32> channelIds; + for (const auto& [channelId, info] : OutputChannelsMap) { + if (info.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { + continue; + } + + channelIds.emplace_back(channelId); + } + + DqComputeActorMetrics.ReportInjectedToTaskRunnerWatermark(pendingWatermark); + + return TMaybe<NTaskRunnerActor::TWatermarkRequest>({std::move(channelIds), pendingWatermark}); + } + TMaybe<NTaskRunnerActor::TCheckpointRequest> GetCheckpointRequest() { TMaybe<NTaskRunnerActor::TCheckpointRequest> req = Nothing(); if (!CheckpointRequestedFromTaskRunner && Checkpoints && Checkpoints->HasPendingCheckpoint() && !Checkpoints->ComputeActorStateSaved()) { @@ -343,7 +393,7 @@ private: if (ProcessSourcesState.Inflight == 0) { auto req = GetCheckpointRequest(); CA_LOG_T("DoExecuteImpl: " << (bool) req); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(std::move(req), /* checkpointOnly = */ false)); + AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), std::move(req), /* checkpointOnly = */ false)); } } @@ -428,6 +478,11 @@ private: PollSources(std::move(sourcesState)); } + if (ev->Get()->WatermarkInjectedToOutputs && !WatermarksTracker.HasOutputChannels()) { + ResumeInputsByWatermark(*WatermarksTracker.GetPendingWatermark()); + WatermarksTracker.PopPendingWatermark(); + } + ReadyToCheckpointFlag = (bool) ev->Get()->ProgramState; if (ev->Get()->CheckpointRequestedFromTaskRunner) { CheckpointRequestedFromTaskRunner = false; @@ -473,7 +528,7 @@ private: ProcessSourcesState.Inflight--; if (ProcessSourcesState.Inflight == 0) { CA_LOG_T("send TEvContinueRun on OnAsyncInputPushFinished"); - AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>()); + AskContinueRun(std::make_unique<NTaskRunnerActor::TEvContinueRun>(GetWatermarkRequest(), Nothing(), false)); } } @@ -487,6 +542,7 @@ private: TOutputChannelInfo& outputChannel = it->second; Y_VERIFY(!outputChannel.AsyncData); // have finished previous cycle outputChannel.AsyncData.ConstructInPlace(); + outputChannel.AsyncData->Watermark = std::move(ev->Get()->Watermark); outputChannel.AsyncData->Data = std::move(ev->Get()->Data); outputChannel.AsyncData->Checkpoint = std::move(ev->Get()->Checkpoint); outputChannel.AsyncData->Finished = ev->Get()->Finished; @@ -519,10 +575,23 @@ private: ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty(); + if (asyncData.Watermark.Defined()) { + const auto watermark = TInstant::MicroSeconds(asyncData.Watermark->GetTimestampUs()); + const bool shouldResumeInputs = WatermarksTracker.NotifyOutputChannelWatermarkSent( + outputChannel.ChannelId, + watermark + ); + + if (shouldResumeInputs) { + ResumeInputsByWatermark(watermark); + } + } + if (!shouldSkipData) { if (asyncData.Checkpoint.Defined()) { - ResumeInputs(); + ResumeInputsByCheckpoint(); } + for (ui32 i = 0; i < asyncData.Data.size(); i++) { auto& chunk = asyncData.Data[i]; NDqProto::TChannelData channelData; @@ -530,16 +599,22 @@ private: // set finished only for last chunk const bool lastChunk = i == asyncData.Data.size() - 1; channelData.SetFinished(asyncData.Finished && lastChunk); + channelData.MutableData()->Swap(&chunk); + if (lastChunk && asyncData.Watermark.Defined()) { + channelData.MutableWatermark()->Swap(&*asyncData.Watermark); + } if (lastChunk && asyncData.Checkpoint.Defined()) { channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } - channelData.MutableData()->Swap(&chunk); Channels->SendChannelData(std::move(channelData)); } if (asyncData.Data.empty() && asyncData.Changed) { NDqProto::TChannelData channelData; channelData.SetChannelId(outputChannel.ChannelId); channelData.SetFinished(asyncData.Finished); + if (asyncData.Watermark.Defined()) { + channelData.MutableWatermark()->Swap(&*asyncData.Watermark); + } if (asyncData.Checkpoint.Defined()) { channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } @@ -573,6 +648,19 @@ private: auto it = TakeInputChannelDataRequests.find(ev->Cookie); YQL_ENSURE(it != TakeInputChannelDataRequests.end()); + CA_LOG_T("Input data push finished. Cookie: " << ev->Cookie + << " Watermark: " << it->second.Watermark + << " Ack: " << it->second.Ack + << " TakeInputChannelDataRequests: " << TakeInputChannelDataRequests.size() + << " WatermarkTakeInputChannelDataRequests: " << WatermarkTakeInputChannelDataRequests.size()); + + if (it->second.Watermark.Defined()) { + auto& ct = WatermarkTakeInputChannelDataRequests.at(*it->second.Watermark); + if (--ct == 0) { + WatermarkTakeInputChannelDataRequests.erase(*it->second.Watermark); + } + } + if (it->second.Ack) { TInputChannelInfo* inputChannel = InputChannelsMap.FindPtr(it->second.ChannelId); Channels->SendChannelDataAck(it->second.ChannelId, inputChannel->FreeSpace); @@ -602,7 +690,7 @@ private: } if (checkpoint) { CA_LOG_I("Resume inputs"); - ResumeInputs(); + ResumeInputsByCheckpoint(); } sinkInfo.PopStarted = false; @@ -745,6 +833,7 @@ private: ProcessContinueRun(); } +private: NKikimr::NMiniKQL::TTypeEnvironment* TypeEnv = nullptr; NTaskRunnerActor::ITaskRunnerActor* TaskRunnerActor = nullptr; NActors::TActorId TaskRunnerActorId; @@ -762,8 +851,12 @@ private: struct TTakeInputChannelData { bool Ack; ui64 ChannelId; + TMaybe<TInstant> Watermark; }; THashMap<ui64, TTakeInputChannelData> TakeInputChannelDataRequests; + // Watermark should be injected to task runner only after all precending data is injected + // This hash map will help to track the right moment + THashMap<TInstant, ui32> WatermarkTakeInputChannelDataRequests; ui64 Cookie = 0; NDq::TDqTaskRunnerStatsView TaskRunnerStats; bool ReadyToCheckpointFlag; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 6be9f937132..c64e3255973 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -69,9 +69,14 @@ struct IDqComputeActorAsyncInput { virtual ui64 GetInputIndex() const = 0; // Gets data and returns space used by filled data batch. + // Watermark will be returned if source watermark was moved forward. Watermark should be handled AFTER data. // Method should be called under bound mkql allocator. // Could throw YQL errors. - virtual i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& batch, bool& finished, i64 freeSpace) = 0; + virtual i64 GetAsyncInputData( + NKikimr::NMiniKQL::TUnboxedValueVector& batch, + TMaybe<TInstant>& watermark, + bool& finished, + i64 freeSpace) = 0; // Checkpointing. virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TSourceState& state) = 0; @@ -150,6 +155,7 @@ public: const NDqProto::TTaskInput& InputDesc; ui64 InputIndex; TTxId TxId; + ui64 TaskId; const THashMap<TString, TString>& SecureParams; const THashMap<TString, TString>& TaskParams; const NActors::TActorId& ComputeActorId; @@ -161,6 +167,7 @@ public: const NDqProto::TTaskOutput& OutputDesc; ui64 OutputIndex; TTxId TxId; + ui64 TaskId; IDqComputeActorAsyncOutput::ICallbacks* Callback; const THashMap<TString, TString>& SecureParams; const THashMap<TString, TString>& TaskParams; @@ -173,6 +180,7 @@ public: const NDqProto::TTaskInput& InputDesc; const ui64 InputIndex; TTxId TxId; + ui64 TaskId; const NUdf::TUnboxedValue TransformInput; const THashMap<TString, TString>& SecureParams; const THashMap<TString, TString>& TaskParams; @@ -186,6 +194,7 @@ public: const NDqProto::TTaskOutput& OutputDesc; const ui64 OutputIndex; TTxId TxId; + ui64 TaskId; const IDqOutputConsumer::TPtr TransformOutput; IDqComputeActorAsyncOutput::ICallbacks* Callback; const THashMap<TString, TString>& SecureParams; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp index f4fdf0569da..e4c6b2f8558 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_channels.cpp @@ -109,6 +109,7 @@ void TDqComputeActorChannels::HandleWork(TEvDqCompute::TEvChannelData::TPtr& ev) << ", seqNo: " << record.GetSeqNo() << ", size: " << channelData.GetData().GetRaw().size() << ", rows: " << channelData.GetData().GetRows() + << ", watermark: " << channelData.HasWatermark() << ", checkpoint: " << channelData.HasCheckpoint() << ", finished: " << channelData.GetFinished() << ", from: " << ev->Sender @@ -555,6 +556,7 @@ void TDqComputeActorChannels::SendChannelData(NDqProto::TChannelData&& channelDa << ", peer: " << *outputChannel.Peer << ", rows: " << chunkRows << ", bytes: " << chunkBytes + << ", watermark: " << channelData.HasWatermark() << ", checkpoint: " << channelData.HasCheckpoint() << ", seqNo: " << seqNo << ", finished: " << finished); diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index f47e15c2666..1f6956773d7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -226,7 +226,7 @@ void TDqComputeActorCheckpoints::Handle(TEvDqCompute::TEvNewCheckpointCoordinato AbortCheckpoint(); if (resumeInputs) { LOG_W("Drop pending checkpoint since coordinator is stale"); - ComputeActor->ResumeInputs(); + ComputeActor->ResumeInputsByCheckpoint(); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h index 61c007708da..bbfb4a327fc 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.h @@ -72,7 +72,7 @@ public: virtual void SaveState(const NDqProto::TCheckpoint& checkpoint, NDqProto::TComputeActorState& state) const = 0; virtual void CommitState(const NDqProto::TCheckpoint& checkpoint) = 0; virtual void InjectBarrierToOutputs(const NDqProto::TCheckpoint& checkpoint) = 0; - virtual void ResumeInputs() = 0; + virtual void ResumeInputsByCheckpoint() = 0; virtual void Start() = 0; virtual void Stop() = 0; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index fd667d846df..eab5bcf8a0b 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1,9 +1,11 @@ #pragma once -#include "dq_compute_actor.h" +#include "dq_compute_actor_async_io.h" #include "dq_compute_actor_channels.h" #include "dq_compute_actor_checkpoints.h" -#include "dq_compute_actor_async_io.h" +#include "dq_compute_actor_metrics.h" +#include "dq_compute_actor_watermarks.h" +#include "dq_compute_actor.h" #include "dq_compute_issues_buffer.h" #include "dq_compute_memory_quota.h" @@ -28,27 +30,28 @@ #include <util/system/hostname.h> #include <any> +#include <queue> #if defined CA_LOG_D || defined CA_LOG_I || defined CA_LOG_E || defined CA_LOG_C # error log macro definition clash #endif #define CA_LOG_T(s) \ - LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_D(s) \ - LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_I(s) \ - LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_W(s) \ - LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_N(s) \ - LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_NOTICE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_E(s) \ - LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG_C(s) \ - LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_CRIT_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, LogPrefix << s) #define CA_LOG(prio, s) \ - LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". " << s) + LOG_LOG_S(*NActors::TlsActivationContext, prio, NKikimrServices::KQP_COMPUTE, LogPrefix << s) namespace NYql { @@ -176,6 +179,7 @@ protected: : ExecuterId(executerId) , TxId(txId) , Task(std::move(task)) + , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << Task.GetId() << ". ") , RuntimeSettings(settings) , MemoryLimits(memoryLimits) , CanAllocateExtraMemory(RuntimeSettings.ExtraMemoryAllocationPool != 0 && MemoryLimits.AllocateMemoryFn) @@ -184,6 +188,8 @@ protected: , CheckpointingMode(GetTaskCheckpointingMode(Task)) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(ownMemoryQuota ? InitMemoryQuota() : nullptr) + , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) + , DqComputeActorMetrics(taskCounters) , Running(!Task.GetCreateSuspended()) , PassExceptions(passExceptions) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") @@ -193,6 +199,7 @@ protected: } InitializeTask(); InitMonCounters(taskCounters); + InitializeWatermarks(); } TDqComputeActorBase(const NActors::TActorId& executerId, const TTxId& txId, const NDqProto::TDqTask& task, @@ -211,6 +218,8 @@ protected: , FunctionRegistry(functionRegistry) , State(Task.GetCreateSuspended() ? NDqProto::COMPUTE_STATE_UNKNOWN : NDqProto::COMPUTE_STATE_EXECUTING) , MemoryQuota(InitMemoryQuota()) + , WatermarksTracker(this->SelfId(), TxId, Task.GetId()) + , DqComputeActorMetrics(taskCounters) , Running(!Task.GetCreateSuspended()) , ComputeActorSpan(NKikimr::TWilsonKqp::ComputeActor, std::move(traceId), "ComputeActor") { @@ -219,6 +228,7 @@ protected: } InitializeTask(); InitMonCounters(taskCounters); + InitializeWatermarks(); } void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { @@ -775,9 +785,33 @@ protected: } } - void ResumeInputs() override { + void ResumeInputsByWatermark(TInstant watermark) { + for (auto& [id, sourceInfo] : SourcesMap) { + if (sourceInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { + continue; + } + + const auto channelId = id; + CA_LOG_T("Resume source " << channelId << " by completed watermark"); + + sourceInfo.ResumeByWatermark(watermark); + } + for (auto& [id, channelInfo] : InputChannelsMap) { - channelInfo.Resume(); + if (channelInfo.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED) { + continue; + } + + const auto channelId = id; + CA_LOG_T("Resume input channel " << channelId << " by completed watermark"); + + channelInfo.ResumeByWatermark(watermark); + } + } + + void ResumeInputsByCheckpoint() override { + for (auto& [id, channelInfo] : InputChannelsMap) { + channelInfo.ResumeByCheckpoint(); } } @@ -839,25 +873,40 @@ protected: protected: struct TInputChannelInfo { + const TString LogPrefix; ui64 ChannelId; IDqInputChannel::TPtr Channel; bool HasPeer = false; + std::queue<TInstant> PendingWatermarks; + const NDqProto::EWatermarksMode WatermarksMode; std::optional<NDqProto::TCheckpoint> PendingCheckpoint; const NDqProto::ECheckpointingMode CheckpointingMode; ui64 FreeSpace = 0; - explicit TInputChannelInfo(ui64 channelId, NDqProto::ECheckpointingMode checkpointingMode) - : ChannelId(channelId) + explicit TInputChannelInfo( + const TString& logPrefix, + ui64 channelId, + NDqProto::EWatermarksMode watermarksMode, + NDqProto::ECheckpointingMode checkpointingMode) + : LogPrefix(logPrefix) + , ChannelId(channelId) + , WatermarksMode(watermarksMode) , CheckpointingMode(checkpointingMode) { } bool IsPaused() const { - return PendingCheckpoint.has_value(); + return PendingWatermarks.empty() || PendingCheckpoint.has_value(); + } + + void Pause(TInstant watermark) { + YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED); + + PendingWatermarks.emplace(watermark); } void Pause(const NDqProto::TCheckpoint& checkpoint) { - YQL_ENSURE(!IsPaused()); + YQL_ENSURE(!PendingCheckpoint); YQL_ENSURE(CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED); PendingCheckpoint = checkpoint; if (Channel) { // async actor doesn't hold channels, so channel is paused in task runner actor @@ -865,7 +914,17 @@ protected: } } - void Resume() { + void ResumeByWatermark(TInstant watermark) { + while (!PendingWatermarks.empty() && PendingWatermarks.front() <= watermark) { + if (PendingWatermarks.front() != watermark) { + CA_LOG_W("Input channel " << ChannelId << + " watermarks were collapsed. See YQ-1441. Dropped watermark: " << PendingWatermarks.front()); + } + PendingWatermarks.pop(); + } + } + + void ResumeByCheckpoint() { PendingCheckpoint.reset(); if (Channel) { // async actor doesn't hold channels, so channel is resumed in task runner actor Channel->Resume(); @@ -874,6 +933,7 @@ protected: }; struct TAsyncInputInfoBase { + const TString LogPrefix; ui64 Index; IDqAsyncInputBuffer::TPtr Buffer; IDqComputeActorAsyncInput* AsyncInput = nullptr; @@ -882,8 +942,31 @@ protected: bool Finished = false; i64 FreeSpace = 1; bool PushStarted = false; + const NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; + TMaybe<TInstant> PendingWatermark = Nothing(); + + explicit TAsyncInputInfoBase( + const TString& logPrefix, + ui64 index, + NDqProto::EWatermarksMode watermarksMode) + : LogPrefix(logPrefix) + , Index(index) + , IssuesBuffer(IssuesBufferSize) + , WatermarksMode(watermarksMode) {} + + bool IsPausedByWatermark() { + return PendingWatermark.Defined(); + } - explicit TAsyncInputInfoBase(ui64 index) : Index(index), IssuesBuffer(IssuesBufferSize) {} + void Pause(TInstant watermark) { + YQL_ENSURE(WatermarksMode != NDqProto::WATERMARKS_MODE_DISABLED); + PendingWatermark = watermark; + } + + void ResumeByWatermark(TInstant watermark) { + YQL_ENSURE(watermark == PendingWatermark); + PendingWatermark = Nothing(); + } }; struct TAsyncInputTransformInfo : public TAsyncInputInfoBase { @@ -900,6 +983,7 @@ protected: bool Finished = false; // != Channel->IsFinished() // If channel is in finished state, it sends only checkpoints. bool PopStarted = false; bool IsTransformOutput = false; // Is this channel output of a transform. + NDqProto::EWatermarksMode WatermarksMode = NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; explicit TOutputChannelInfo(ui64 channelId) : ChannelId(channelId) @@ -913,6 +997,7 @@ protected: struct TAsyncData { // Is used in case of async compute actor TVector<NDqProto::TData> Data; + TMaybe<NDqProto::TWatermark> Watermark; TMaybe<NDqProto::TCheckpoint> Checkpoint; bool Finished = false; bool Changed = false; @@ -1133,7 +1218,6 @@ protected: this->TerminateSources(issues, success); if (ev->Sender != ExecuterId) { - if (ComputeActorSpan) { ComputeActorSpan.End(); } @@ -1217,11 +1301,13 @@ private: auto channel = outputChannel.Channel; NDqProto::TData data; + NDqProto::TWatermark watermark; NDqProto::TCheckpoint checkpoint; bool hasData = channel->Pop(data, bytes); + bool hasWatermark = channel->Pop(watermark); bool hasCheckpoint = channel->Pop(checkpoint); - if (!hasData && !hasCheckpoint) { + if (!hasData && !hasWatermark && !hasCheckpoint) { if (!channel->IsFinished()) { CA_LOG_D("output channelId: " << channel->GetChannelId() << ", nothing to send and is not finished"); return 0; // channel is empty and not finished yet @@ -1232,6 +1318,7 @@ private: const bool becameFinished = !wasFinished && outputChannel.Finished; ui32 dataSize = data.GetRaw().size(); + ui32 watermarkSize = watermark.ByteSize(); ui32 checkpointSize = checkpoint.ByteSize(); NDqProto::TChannelData channelData; @@ -1240,15 +1327,22 @@ private: if (hasData) { channelData.MutableData()->Swap(&data); } + if (hasWatermark) { + channelData.MutableWatermark()->Swap(&watermark); + CA_LOG_I("Resume inputs by watermark"); + // This is excessive, inputs should be resumed after async CA received response with watermark from task runner. + // But, let it be here, it's better to have the same code as in checkpoints + ResumeInputsByWatermark(TInstant::MicroSeconds(watermark.GetTimestampUs())); + } if (hasCheckpoint) { channelData.MutableCheckpoint()->Swap(&checkpoint); - CA_LOG_I("Resume inputs"); - ResumeInputs(); + CA_LOG_I("Resume inputs by checkpoint"); + ResumeInputsByCheckpoint(); } - if (hasData || hasCheckpoint || becameFinished) { + if (hasData || hasWatermark || hasCheckpoint || becameFinished) { Channels->SendChannelData(std::move(channelData)); - return dataSize + checkpointSize; + return dataSize + watermarkSize + checkpointSize; } return 0; } @@ -1315,7 +1409,7 @@ private: if (hasCheckpoint) { maybeCheckpoint = checkpoint; CA_LOG_I("Resume inputs"); - ResumeInputs(); + ResumeInputsByCheckpoint(); } outputInfo.AsyncOutput->SendData(std::move(dataBatch), dataSize, maybeCheckpoint, outputInfo.Finished); @@ -1423,6 +1517,7 @@ protected: .InputDesc = inputDesc, .InputIndex = inputIndex, .TxId = TxId, + .TaskId = Task.GetId(), .TransformInput = transform.InputBuffer, .SecureParams = secureParams, .TaskParams = taskParams, @@ -1498,16 +1593,24 @@ protected: void PollAsyncInput(TAsyncInputInfoBase& info, ui64 inputIndex) { Y_VERIFY(!TaskRunner || info.Buffer); + if (info.Finished) { CA_LOG_D("Skip polling async input[" << inputIndex << "]: finished"); return; } + + if (info.IsPausedByWatermark()) { + CA_LOG_T("Skip polling async input[" << inputIndex << "]: paused"); + return; + } + const i64 freeSpace = AsyncIoFreeSpace(info); if (freeSpace > 0) { + TMaybe<TInstant> watermark; NKikimr::NMiniKQL::TUnboxedValueVector batch; Y_VERIFY(info.AsyncInput); bool finished = false; - const i64 space = info.AsyncInput->GetAsyncInputData(batch, finished, freeSpace); + const i64 space = info.AsyncInput->GetAsyncInputData(batch, watermark, finished, freeSpace); CA_LOG_T("Poll async input " << inputIndex << ". Buffer free space: " << freeSpace << ", read from async input: " << space << " bytes, " @@ -1519,6 +1622,20 @@ protected: // but we haven't read all of it. ContinueExecute(); } + + DqComputeActorMetrics.ReportAsyncInputData(inputIndex, batch.size(), watermark); + + if (watermark) { + const auto inputWatermarkChanged = WatermarksTracker.NotifyAsyncInputWatermarkReceived( + inputIndex, + *watermark); + + if (inputWatermarkChanged) { + CA_LOG_T("Pause async input " << inputIndex << " because of watermark " << *watermark); + info.Pause(*watermark); + } + } + AsyncInputPush(std::move(batch), info, space, finished); } } @@ -1628,16 +1745,28 @@ private: Y_VERIFY(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels if (inputDesc.HasTransform()) { - auto result = InputTransformsMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple(i)); + auto result = InputTransformsMap.emplace( + std::piecewise_construct, + std::make_tuple(i), + std::make_tuple(LogPrefix, i, NDqProto::WATERMARKS_MODE_DISABLED) + ); YQL_ENSURE(result.second); } if (inputDesc.HasSource()) { - auto result = SourcesMap.emplace(std::piecewise_construct, std::make_tuple(i), std::make_tuple(i)); + const auto watermarksMode = inputDesc.GetSource().GetWatermarksMode(); + auto result = SourcesMap.emplace(i, TAsyncInputInfoBase(LogPrefix, i, watermarksMode)); YQL_ENSURE(result.second); } else { for (auto& channel : inputDesc.GetChannels()) { - auto result = InputChannelsMap.emplace(channel.GetId(), TInputChannelInfo(channel.GetId(), channel.GetCheckpointingMode())); + auto result = InputChannelsMap.emplace( + channel.GetId(), + TInputChannelInfo( + LogPrefix, + channel.GetId(), + channel.GetWatermarksMode(), + channel.GetCheckpointingMode()) + ); YQL_ENSURE(result.second); } } @@ -1661,6 +1790,7 @@ private: TOutputChannelInfo outputChannel(channel.GetId()); outputChannel.HasPeer = channel.GetDstEndpoint().HasActorId(); outputChannel.IsTransformOutput = outputDesc.HasTransform(); + outputChannel.WatermarksMode = channel.GetWatermarksMode(); if (Y_UNLIKELY(RuntimeSettings.StatsMode >= NDqProto::DQ_STATS_MODE_PROFILE)) { outputChannel.Stats = MakeHolder<typename TOutputChannelInfo::TStats>(); @@ -1673,6 +1803,26 @@ private: } } + void InitializeWatermarks() { + for (const auto& [id, source] : SourcesMap) { + if (source.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) { + WatermarksTracker.RegisterAsyncInput(id); + } + } + + for (const auto& [id, channel] : InputChannelsMap) { + if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) { + WatermarksTracker.RegisterInputChannel(id); + } + } + + for (const auto& [id, channel] : OutputChannelsMap) { + if (channel.WatermarksMode == NDqProto::EWatermarksMode::WATERMARKS_MODE_DEFAULT) { + WatermarksTracker.RegisterOutputChannel(id); + } + } + } + virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() { if (!TaskRunner) { return nullptr; @@ -1815,6 +1965,7 @@ protected: const NActors::TActorId ExecuterId; const TTxId TxId; const NDqProto::TDqTask Task; + const TString LogPrefix; const TComputeRuntimeSettings RuntimeSettings; const TComputeMemoryLimits MemoryLimits; const bool CanAllocateExtraMemory = false; @@ -1850,6 +2001,8 @@ protected: TProcessOutputsState ProcessOutputsState; THolder<TDqMemoryQuota> MemoryQuota; + TDqComputeActorWatermarks WatermarksTracker; + TDqComputeActorMetrics DqComputeActorMetrics; private: bool Running = true; TInstant LastSendStatsTime; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp new file mode 100644 index 00000000000..a6547c94aa5 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.cpp @@ -0,0 +1,93 @@ +#include "dq_compute_actor_metrics.h" + +namespace NYql::NDq { + +TDqComputeActorMetrics::TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters) { + if (!counters) { + return; + } + + ComputeActorSubgroup = counters->GetSubgroup("subsystem", "compute_actor"); + InjectedToTaskRunnerWatermark = ComputeActorSubgroup->GetCounter("watermark_injected_ms"); + InjectedToOutputsWatermark = ComputeActorSubgroup->GetCounter("watermark_outputs_ms"); + WatermarkCollectLatency = ComputeActorSubgroup->GetHistogram( + "watermark_collect_ms", + NMonitoring::ExplicitHistogram({0, 15, 50, 100, 250, 500, 1000, 10'000, 100'000})); +} + +void TDqComputeActorMetrics::ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) { + if (!Enable) { + return; + } + + auto counters = GetAsyncInputCounters(id); + counters->GetCounter("rows", true)->Add(dataSize); + + if (!watermark) { + return; + } + + ReportInputWatermarkMetrics(counters, *watermark); +} + +void TDqComputeActorMetrics::ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark) { + if (!Enable) { + return; + } + + auto counters = GetInputChannelCounters(id); + counters->GetCounter("rows", true)->Add(dataSize); + if (!watermark) { + return; + } + + ReportInputWatermarkMetrics(counters, *watermark); +} + +void TDqComputeActorMetrics::ReportInjectedToTaskRunnerWatermark(TInstant watermark) { + if (!Enable) { + return; + } + + InjectedToTaskRunnerWatermark->Set(watermark.MilliSeconds()); +} + +void TDqComputeActorMetrics::ReportInjectedToOutputsWatermark(TInstant watermark) { + if (!Enable) { + return; + } + + InjectedToOutputsWatermark->Set(watermark.MilliSeconds()); + auto iter = WatermarkStartedAt.find(watermark); + if (iter != WatermarkStartedAt.end()) { + WatermarkCollectLatency->Collect((TInstant::Now() - iter->second).MilliSeconds()); + WatermarkStartedAt.erase(iter); + } +} + +NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetAsyncInputCounters(ui32 id) { + auto iter = AsyncInputsCounters.find(id); + if (iter == AsyncInputsCounters.end()) { + iter = AsyncInputsCounters.emplace(id, ComputeActorSubgroup->GetSubgroup("async_input", ToString(id))).first; + } + + return iter->second; +} + +NMonitoring::TDynamicCounterPtr TDqComputeActorMetrics::GetInputChannelCounters(ui32 id) { + auto iter = InputChannelsCounters.find(id); + if (iter == InputChannelsCounters.end()) { + iter = InputChannelsCounters.emplace(id, ComputeActorSubgroup->GetSubgroup("input_channel", ToString(id))).first; + } + + return iter->second; +} + +void TDqComputeActorMetrics::ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark) { + counters->GetCounter("watermark_ms")->Set(watermark.MilliSeconds()); + if (!WatermarkStartedAt.contains(watermark)) { + WatermarkStartedAt[watermark] = TInstant::Now(); + } +} + +} diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h new file mode 100644 index 00000000000..3e9afbf64ae --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_metrics.h @@ -0,0 +1,36 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <util/generic/hash.h> + +namespace NYql::NDq { + +struct TDqComputeActorMetrics { +public: + TDqComputeActorMetrics(const NMonitoring::TDynamicCounterPtr& counters); + + void ReportAsyncInputData(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark); + void ReportInputChannelWatermark(ui32 id, ui64 dataSize, TMaybe<TInstant> watermark); + void ReportInjectedToTaskRunnerWatermark(TInstant watermark); + void ReportInjectedToOutputsWatermark(TInstant watermark); + +private: + NMonitoring::TDynamicCounterPtr GetAsyncInputCounters(ui32 id); + NMonitoring::TDynamicCounterPtr GetInputChannelCounters(ui32 id); + void ReportInputWatermarkMetrics(NMonitoring::TDynamicCounterPtr& counters, TInstant watermark); + +private: + bool Enable = false; + NMonitoring::TDynamicCounterPtr ComputeActorSubgroup; + THashMap<ui32, NMonitoring::TDynamicCounterPtr> AsyncInputsCounters; + THashMap<ui32, NMonitoring::TDynamicCounterPtr> InputChannelsCounters; + NMonitoring::TDynamicCounters::TCounterPtr WatermarkCt; + NMonitoring::TDynamicCounters::TCounterPtr InjectedToTaskRunnerWatermark; + NMonitoring::TDynamicCounters::TCounterPtr InjectedToOutputsWatermark; + NMonitoring::THistogramPtr WatermarkCollectLatency; + + THashMap<TInstant, TInstant> WatermarkStartedAt; +}; + +} diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp new file mode 100644 index 00000000000..d99efbdffe4 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.cpp @@ -0,0 +1,169 @@ +#include "dq_compute_actor_watermarks.h" + +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/yql/minikql/comp_nodes/mkql_saveload.h> + +#include <algorithm> + +#define LOG_T(s) \ + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) +#define LOG_D(s) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) +#define LOG_I(s) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) +#define LOG_W(s) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) +#define LOG_E(s) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::KQP_COMPUTE, "SelfId: " << SelfId << ", TxId: " << TxId << ", task: " << TaskId << ". Watermarks. " << s) + +namespace NYql::NDq { + +using namespace NActors; + +TDqComputeActorWatermarks::TDqComputeActorWatermarks( + NActors::TActorIdentity selfId, + const TTxId txId, + ui64 taskId +) + : SelfId(selfId) + , TxId(txId) + , TaskId(taskId) { +} + +void TDqComputeActorWatermarks::RegisterAsyncInput(ui64 inputId) { + AsyncInputsWatermarks[inputId] = Nothing(); +} + +void TDqComputeActorWatermarks::RegisterInputChannel(ui64 inputId) { + InputChannelsWatermarks[inputId] = Nothing(); +} + +void TDqComputeActorWatermarks::RegisterOutputChannel(ui64 outputId) { + OutputChannelsWatermarks[outputId] = Nothing(); +} + +bool TDqComputeActorWatermarks::HasOutputChannels() const { + return !OutputChannelsWatermarks.empty(); +} + +bool TDqComputeActorWatermarks::NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark) { + LOG_T("Async input " << inputId << " notified about watermark " << watermark); + + auto& asyncInputWatermark = AsyncInputsWatermarks[inputId]; + if (!asyncInputWatermark || *asyncInputWatermark < watermark) { + LOG_T("Async input " << inputId << " watermark was updated to " << watermark); + asyncInputWatermark = watermark; + RecalcPendingWatermark(); + return true; + } + + return false; +} + +bool TDqComputeActorWatermarks::NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark) { + LOG_T("Input channel " << inputId << " notified about watermark " << watermark); + + auto& inputChannelWatermark = InputChannelsWatermarks[inputId]; + if (!inputChannelWatermark || *inputChannelWatermark < watermark) { + LOG_T("Input channel " << inputId << " watermark was updated to " << watermark); + inputChannelWatermark = watermark; + RecalcPendingWatermark(); + return true; + } + + return false; +} + +bool TDqComputeActorWatermarks::NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark) { + auto logPrefix = TStringBuilder() << "Output channel " + << outputId << " notified about watermark '" << watermark << "'"; + + LOG_T(logPrefix); + + if (watermark <= LastWatermark) { + LOG_E(logPrefix << "' when LastWatermark was already forwarded to " << *LastWatermark); + // We will try to ignore this error, but something strange happened + } + + if (watermark != PendingWatermark) { + LOG_E(logPrefix << " when '" << PendingWatermark << "' was expected"); + // We will try to ignore this error, but something strange happened + } + + OutputChannelsWatermarks[outputId] = watermark; + + return MaybePopPendingWatermark(); +} + +bool TDqComputeActorWatermarks::HasPendingWatermark() const { + return PendingWatermark.Defined(); +} + +TMaybe<TInstant> TDqComputeActorWatermarks::GetPendingWatermark() const { + return PendingWatermark; +} + +void TDqComputeActorWatermarks::RecalcPendingWatermark() { + if (AsyncInputsWatermarks.empty() && InputChannelsWatermarks.empty()) { + return; + } + + auto newWatermark = TInstant::Max(); + for (const auto& [_, watermark] : AsyncInputsWatermarks) { + if (!watermark) { + return; + } + + newWatermark = std::min(newWatermark, *watermark); + } + + for (const auto& [_, watermark] : InputChannelsWatermarks) { + if (!watermark) { + return; + } + + newWatermark = std::min(newWatermark, *watermark); + } + + if (!LastWatermark || newWatermark != LastWatermark) { + LOG_T("New pending watermark " << newWatermark); + PendingWatermark = newWatermark; + } +} + +bool TDqComputeActorWatermarks::MaybePopPendingWatermark() { + if (OutputChannelsWatermarks.empty()) { + return true; + } + + if (!PendingWatermark) { + LOG_E("There is no pending watermark, but pop was called"); + // We will try to ignore this error, but something strange happened + return true; + } + + auto outWatermark = TInstant::Max(); + for (const auto& [_, watermark] : OutputChannelsWatermarks) { + if (!watermark) { + return false; + } + + outWatermark = std::min(outWatermark, *watermark); + } + + if (outWatermark >= *PendingWatermark) { + LastWatermark = PendingWatermark; + PopPendingWatermark(); + return true; + } + + return false; +} + +void TDqComputeActorWatermarks::PopPendingWatermark() { + LOG_T("Watermark " << *PendingWatermark << " was popped. "); + PendingWatermark = Nothing(); +} + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h new file mode 100644 index 00000000000..183fbf73dcb --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_watermarks.h @@ -0,0 +1,52 @@ +#pragma once + +#include <ydb/library/yql/dq/common/dq_common.h> + +#include <library/cpp/actors/core/log.h> + +namespace NYql::NDq { + +class TDqComputeActorWatermarks +{ +public: + TDqComputeActorWatermarks(NActors::TActorIdentity selfId, const TTxId graphId, ui64 taskId); + + void RegisterAsyncInput(ui64 inputId); + void RegisterInputChannel(ui64 inputId); + void RegisterOutputChannel(ui64 outputId); + bool HasOutputChannels() const; + + // Will return true, if local watermark inside this async input was moved forward. + // CA should pause this async input and wait for coresponding watermarks in all other sources/inputs. + bool NotifyAsyncInputWatermarkReceived(ui64 inputId, TInstant watermark); + + // Will return true, if local watermark inside this input channel was moved forward. + // CA should pause this input channel and wait for coresponding watermarks in all other sources/inputs. + bool NotifyInChannelWatermarkReceived(ui64 inputId, TInstant watermark); + + // Will return true, if watermark was sent to all registered outputs. + // CA should resume inputs and sources in this case + bool NotifyOutputChannelWatermarkSent(ui64 outputId, TInstant watermark); + + bool HasPendingWatermark() const; + TMaybe<TInstant> GetPendingWatermark() const; + void PopPendingWatermark(); + +private: + void RecalcPendingWatermark(); + bool MaybePopPendingWatermark(); + +private: + const NActors::TActorIdentity SelfId; + const TTxId TxId; + ui64 TaskId; + + std::unordered_map<ui64, TMaybe<TInstant>> AsyncInputsWatermarks; + std::unordered_map<ui64, TMaybe<TInstant>> InputChannelsWatermarks; + std::unordered_map<ui64, TMaybe<TInstant>> OutputChannelsWatermarks; + + TMaybe<TInstant> PendingWatermark; + TMaybe<TInstant> LastWatermark; +}; + +} // namespace NYql::NDq diff --git a/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h new file mode 100644 index 00000000000..01f8f139393 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h @@ -0,0 +1,80 @@ +#include <util/datetime/base.h> +#include <util/generic/maybe.h> +#include <util/system/types.h> + +#include <optional> +#include <unordered_map> + +namespace NYql::NDq { + +template <typename TPartitionKey> +struct TDqSourceWatermarkTracker { +public: + TDqSourceWatermarkTracker( + TDuration granularity, + TInstant startWatermark, + ui32 expectedPartitionsCount) + : Granularity(granularity) + , StartWatermark(ToDiscreteTime(startWatermark)) + , ExpectedPartitionsCount(expectedPartitionsCount) {} + + TMaybe<TInstant> NotifyNewPartitionTime(const TPartitionKey& partitionKey, TInstant time) { + auto granularPartitionTime = ToDiscreteTime(time); + + auto iter = Data.find(partitionKey); + if (iter == Data.end()) { + Data[partitionKey] = granularPartitionTime; + return RecalcWatermark(); + } + + if (granularPartitionTime <= iter->second) { + return Nothing(); + } + + iter->second = granularPartitionTime; + return RecalcWatermark(); + } + +private: + TInstant ToDiscreteTime(TInstant time) const { + return TInstant::MicroSeconds(time.MicroSeconds() - time.MicroSeconds() % Granularity.MicroSeconds()); + } + + TMaybe<TInstant> RecalcWatermark() { + if (!Watermark) { + // We have to inject start watermark before first data item, because some graph nodes can't start + // data processing without knowing what the current watermark is. + Watermark = StartWatermark; + return Watermark; + } + + if (Data.size() < ExpectedPartitionsCount) { + // Each partition should notify time at least once before we are able to move watermark + return Nothing(); + } + + auto minTime = Data.begin()->second; + for (const auto& [_, time] : Data) { + if (time < minTime) { + minTime = time; + } + } + + if (minTime > Watermark) { + Watermark = minTime; + return Watermark; + } + + return Nothing(); + } + +private: + const TDuration Granularity; + const TInstant StartWatermark; + const ui32 ExpectedPartitionsCount; + + THashMap<TPartitionKey, TInstant> Data; + TMaybe<TInstant> Watermark; +}; + +} diff --git a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt index 060134b4ccc..653cd18127a 100644 --- a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt +++ b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.darwin.txt @@ -32,6 +32,7 @@ target_link_options(ydb-library-yql-dq-actors-compute-ut PRIVATE ) target_sources(ydb-library-yql-dq-actors-compute-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_compute_issues_buffer_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp ) add_test( NAME diff --git a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt index 3b6d0fa0462..0169715a20a 100644 --- a/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt +++ b/ydb/library/yql/dq/actors/compute/ut/CMakeLists.linux.txt @@ -36,6 +36,7 @@ target_link_options(ydb-library-yql-dq-actors-compute-ut PRIVATE ) target_sources(ydb-library-yql-dq-actors-compute-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_compute_issues_buffer_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp ) add_test( NAME diff --git a/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp new file mode 100644 index 00000000000..c27347d22a6 --- /dev/null +++ b/ydb/library/yql/dq/actors/compute/ut/dq_source_watermark_tracker_ut.cpp @@ -0,0 +1,83 @@ +#include <ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NYql::NDq { + +Y_UNIT_TEST_SUITE(TDqSourceWatermarkTrackerTest) { + Y_UNIT_TEST(StartWatermark) { + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), TInstant::Seconds(11), 2); + + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(11)); + UNIT_ASSERT_VALUES_EQUAL(actual1, TInstant::Seconds(10)); + + const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(11)); + UNIT_ASSERT_VALUES_EQUAL(actual2.Defined(), false); // Start watermark was returned already, we shouldn't return it 2-nd time + } + + Y_UNIT_TEST(WatermarkMovement1) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(12)); + UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + } + + Y_UNIT_TEST(WatermarkMovement2) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13)); + UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + } + + Y_UNIT_TEST(WatermarkMovement3) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(13)); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(13)); + UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + } + + Y_UNIT_TEST(WatermarkMovement4) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); + const auto actual = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(14)); + UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(12)); + } + + Y_UNIT_TEST(WatermarkFarMovement) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); + tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30)); + const auto actual = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30)); + UNIT_ASSERT_VALUES_EQUAL(actual, TInstant::Seconds(30)); + } + + Y_UNIT_TEST(WaitExpectedPartitionsCount) { + const auto startWatermark = TInstant::Seconds(10); + + TDqSourceWatermarkTracker<ui32> tracker(TDuration::Seconds(2), startWatermark, 2); + + tracker.NotifyNewPartitionTime(0, TInstant::Seconds(12)); + const auto actual1 = tracker.NotifyNewPartitionTime(0, TInstant::Seconds(30)); + UNIT_ASSERT_VALUES_EQUAL(actual1.Defined(), false); // Since expectedPartitionsCount is 2, we shouldn't move watermark + + const auto actual2 = tracker.NotifyNewPartitionTime(1, TInstant::Seconds(30)); + UNIT_ASSERT_VALUES_EQUAL(actual2, TInstant::Seconds(30)); + } +} + +} diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 56183089cf4..3baf18902e8 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -15,17 +15,23 @@ message TCheckpoint { optional uint64 Generation = 2; }; +message TWatermark { + optional uint64 TimestampUs = 1; +}; + /* Data and control messages will be processed in the following order: 1) Data - 2) Checkpoint - 3) Finished + 2) Watermark + 3) Checkpoint + 4) Finished */ message TChannelData { optional uint64 ChannelId = 1; optional NYql.NDqProto.TData Data = 2; optional bool Finished = 3; optional TCheckpoint Checkpoint = 4; + optional TWatermark Watermark = 5; }; message TEvRun { diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h index f9993c12090..8fbb410b1ec 100644 --- a/ydb/library/yql/dq/actors/task_runner/events.h +++ b/ydb/library/yql/dq/actors/task_runner/events.h @@ -199,6 +199,7 @@ struct TEvTaskRunFinished const TDqMemoryQuota::TProfileStats& profileStats = {}, ui64 mkqlMemoryLimit = 0, THolder<NDqProto::TMiniKqlProgramState>&& programState = nullptr, + bool watermarkInjectedToOutputs = false, bool checkpointRequestedFromTaskRunner = false, TDuration computeTime = TDuration::Zero()) : RunStatus(runStatus) @@ -208,6 +209,7 @@ struct TEvTaskRunFinished , ProfileStats(profileStats) , MkqlMemoryLimit(mkqlMemoryLimit) , ProgramState(std::move(programState)) + , WatermarkInjectedToOutputs(watermarkInjectedToOutputs) , CheckpointRequestedFromTaskRunner(checkpointRequestedFromTaskRunner) , ComputeTime(computeTime) { } @@ -220,13 +222,16 @@ struct TEvTaskRunFinished TDqMemoryQuota::TProfileStats ProfileStats; ui64 MkqlMemoryLimit = 0; THolder<NDqProto::TMiniKqlProgramState> ProgramState; + bool WatermarkInjectedToOutputs = false; bool CheckpointRequestedFromTaskRunner = false; TDuration ComputeTime; }; struct TEvChannelPopFinished - : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> { + : NActors::TEventLocal<TEvChannelPopFinished, TTaskRunnerEvents::ES_POP_FINISHED> +{ TEvChannelPopFinished() = default; + TEvChannelPopFinished(ui32 channelId) : Stats() , ChannelId(channelId) @@ -234,11 +239,21 @@ struct TEvChannelPopFinished , Finished(false) , Changed(false) { } - TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, TMaybe<NDqProto::TCheckpoint>&& checkpoint, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {}, TDqTaskRunnerStatsView&& stats = {}) + + TEvChannelPopFinished( + ui32 channelId, + TVector<NDqProto::TData>&& data, + TMaybe<NDqProto::TWatermark>&& watermark, + TMaybe<NDqProto::TCheckpoint>&& checkpoint, + bool finished, + bool changed, + const TTaskRunnerActorSensors& sensors = {}, + TDqTaskRunnerStatsView&& stats = {}) : Sensors(sensors) , Stats(std::move(stats)) , ChannelId(channelId) , Data(std::move(data)) + , Watermark(std::move(watermark)) , Checkpoint(std::move(checkpoint)) , Finished(finished) , Changed(changed) @@ -248,12 +263,24 @@ struct TEvChannelPopFinished NDq::TDqTaskRunnerStatsView Stats; const ui32 ChannelId; + // The order is Data -> Watermark -> Checkpoint TVector<NDqProto::TData> Data; - TMaybe<NDqProto::TCheckpoint> Checkpoint; // checkpoint follows the last data in this->Data (if it is not empty) + TMaybe<NDqProto::TWatermark> Watermark; + TMaybe<NDqProto::TCheckpoint> Checkpoint; bool Finished; bool Changed; }; +struct TWatermarkRequest { + TWatermarkRequest(TVector<ui32>&& channelIds, TInstant watermark) + : ChannelIds(std::move(channelIds)) + , Watermark(watermark) { + } + + TVector<ui32> ChannelIds; + TInstant Watermark; +}; + // Holds info required to inject barriers to outputs struct TCheckpointRequest { TCheckpointRequest(TVector<ui32>&& channelIds, TVector<ui32>&& sinkIds, const NDqProto::TCheckpoint& checkpoint) @@ -272,10 +299,15 @@ struct TEvContinueRun TEvContinueRun() = default; - explicit TEvContinueRun(TMaybe<TCheckpointRequest>&& checkpointRequest, bool checkpointOnly) + explicit TEvContinueRun( + TMaybe<TWatermarkRequest>&& watermarkRequest, + TMaybe<TCheckpointRequest>&& checkpointRequest, + bool checkpointOnly + ) : ChannelId(0) , MemLimit(0) , FreeSpace(0) + , WatermarkRequest(std::move(watermarkRequest)) , CheckpointRequest(std::move(checkpointRequest)) , CheckpointOnly(checkpointOnly) { } @@ -299,6 +331,7 @@ struct TEvContinueRun const THashSet<ui32> InputChannels; ui64 MemLimit; ui64 FreeSpace; + TMaybe<TWatermarkRequest> WatermarkRequest = Nothing(); TMaybe<TCheckpointRequest> CheckpointRequest = Nothing(); bool CheckpointOnly = false; }; diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index c8ac3afd65d..7ac6dafaa6b 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -20,6 +20,7 @@ #include <util/generic/queue.h> #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream); +#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream); #define LOG_I(stream) LOG_INFO_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream); #define LOG_D(stream) LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream); #define LOG_T(stream) LOG_TRACE_S(*TlsActivationContext, NKikimrServices::YQL_PROXY, "SelfId: " << SelfId() << ", TxId: " << TxId << ", task: " << TaskId << ". " << stream); @@ -144,10 +145,21 @@ private: NYql::NDq::ERunStatus res = ERunStatus::Finished; THashMap<ui32, ui64> inputChannelFreeSpace; THashMap<ui32, ui64> sourcesFreeSpace; + + const bool shouldHandleWatermark = ev->Get()->WatermarkRequest.Defined() + && ev->Get()->WatermarkRequest->Watermark > TaskRunner->GetWatermark().WatermarkIn; + if (!ev->Get()->CheckpointOnly) { + if (shouldHandleWatermark) { + const auto watermark = ev->Get()->WatermarkRequest->Watermark; + LOG_T("Task runner. Inject watermark " << watermark); + TaskRunner->SetWatermarkIn(watermark); + } + res = TaskRunner->Run(); LOG_T("Resume execution, run status: " << res); } + if (res == ERunStatus::PendingInput) { for (auto& channelId : inputMap) { inputChannelFreeSpace[channelId] = TaskRunner->GetInputChannel(channelId)->GetFreeSpace(); @@ -158,25 +170,38 @@ private: } } + auto watermarkInjectedToOutputs = false; THolder<NDqProto::TMiniKqlProgramState> mkqlProgramState; - if ((res == ERunStatus::PendingInput || res == ERunStatus::Finished) && ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) { - mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>(); - try { - mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); - NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData(); - data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); - data.SetBlob(TaskRunner->Save()); - // inject barriers - // todo:(whcrc) barriers are injected even if source state save failed - for (const auto& channelId : ev->Get()->CheckpointRequest->ChannelIds) { - TaskRunner->GetOutputChannel(channelId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint)); + if (res == ERunStatus::PendingInput || res == ERunStatus::Finished) { + if (shouldHandleWatermark) { + for (const auto& channelId : ev->Get()->WatermarkRequest->ChannelIds) { + NDqProto::TWatermark watermark; + watermark.SetTimestampUs(ev->Get()->WatermarkRequest->Watermark.MicroSeconds()); + TaskRunner->GetOutputChannel(channelId)->Push(std::move(watermark)); } - for (const auto& sinkId : ev->Get()->CheckpointRequest->SinkIds) { - TaskRunner->GetSink(sinkId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint)); + + watermarkInjectedToOutputs = true; + } + + if (ev->Get()->CheckpointRequest.Defined() && ReadyToCheckpoint()) { + mkqlProgramState = MakeHolder<NDqProto::TMiniKqlProgramState>(); + try { + mkqlProgramState->SetRuntimeVersion(NDqProto::RUNTIME_VERSION_YQL_1_0); + NDqProto::TStateData::TData& data = *mkqlProgramState->MutableData()->MutableStateData(); + data.SetVersion(TDqComputeActorCheckpoints::ComputeActorCurrentStateVersion); + data.SetBlob(TaskRunner->Save()); + // inject barriers + // todo:(whcrc) barriers are injected even if source state save failed + for (const auto& channelId : ev->Get()->CheckpointRequest->ChannelIds) { + TaskRunner->GetOutputChannel(channelId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint)); + } + for (const auto& sinkId : ev->Get()->CheckpointRequest->SinkIds) { + TaskRunner->GetSink(sinkId)->Push(NDqProto::TCheckpoint(ev->Get()->CheckpointRequest->Checkpoint)); + } + } catch (const std::exception& e) { + LOG_E("Failed to save state: " << e.what()); + mkqlProgramState = nullptr; } - } catch (const std::exception& e) { - LOG_E("Failed to save state: " << e.what()); - mkqlProgramState = nullptr; } } @@ -194,6 +219,7 @@ private: MemoryQuota ? *MemoryQuota->GetProfileStats() : TDqMemoryQuota::TProfileStats(), MemoryQuota ? MemoryQuota->GetMkqlMemoryLimit() : 0, std::move(mkqlProgramState), + watermarkInjectedToOutputs, ev->Get()->CheckpointRequest.Defined(), TInstant::Now() - start), /*flags=*/0, @@ -281,22 +307,31 @@ private: } TVector<NDqProto::TData> chunks; + TMaybe<NDqProto::TWatermark> watermark = Nothing(); TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing(); for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { NDqProto::TData data; hasData = channel->Pop(data, remain); + + NDqProto::TWatermark poppedWatermark; + bool hasWatermark = channel->Pop(poppedWatermark); + NDqProto::TCheckpoint poppedCheckpoint; bool hasCheckpoint = channel->Pop(poppedCheckpoint); + dataSize = data.GetRaw().size(); isFinished = !hasData && channel->IsFinished(); - changed = changed || hasData || hasCheckpoint || (isFinished != wasFinished); + changed = changed || hasData || hasWatermark || hasCheckpoint || (isFinished != wasFinished); if (hasData) { chunks.emplace_back(std::move(data)); } + + watermark = hasWatermark ? std::move(poppedWatermark) : TMaybe<NDqProto::TWatermark>(); + checkpoint = hasCheckpoint ? std::move(poppedCheckpoint) : TMaybe<NDqProto::TCheckpoint>(); + if (hasCheckpoint) { - checkpoint = std::move(poppedCheckpoint); ResumeInputs(); break; } @@ -307,6 +342,7 @@ private: new TEvChannelPopFinished( channelId, std::move(chunks), + std::move(watermark), std::move(checkpoint), isFinished, changed, diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 5ac9c3fe850..0e7cfbdea19 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -20,6 +20,14 @@ enum ECheckpointingMode { CHECKPOINTING_MODE_DISABLED = 1; // Checkpoints are not used in this part of graph (channels, tasks). This is typically when we are deadling with finite part of graph. } +// Mode of watermarks support. +// There can be different watermarks settings +// in different graph parts. +enum EWatermarksMode { + WATERMARKS_MODE_DEFAULT = 0; // Watermarks are used according to common settings for our type of query. + WATERMARKS_MODE_DISABLED = 1; // Watermarks are not used in this part of graph (channels, tasks). +} + message TProgram { message TSettings { bool HasMapJoin = 1; @@ -58,6 +66,7 @@ message TChannel { bool IsPersistent = 7; bool InMemory = 8; ECheckpointingMode CheckpointingMode = 9; + EWatermarksMode WatermarksMode = 10; } message TUnionAllInput { @@ -66,6 +75,7 @@ message TUnionAllInput { message TSourceInput { string Type = 1; google.protobuf.Any Settings = 2; + EWatermarksMode WatermarksMode = 3; } message TSortColumn { diff --git a/ydb/library/yql/dq/runtime/dq_async_output.cpp b/ydb/library/yql/dq/runtime/dq_async_output.cpp index 38bd1999baa..5c8b38054ed 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.cpp +++ b/ydb/library/yql/dq/runtime/dq_async_output.cpp @@ -11,7 +11,7 @@ namespace { class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { struct TValueDesc { - std::variant<NUdf::TUnboxedValue, NDqProto::TCheckpoint> Value; + std::variant<NUdf::TUnboxedValue, NDqProto::TWatermark, NDqProto::TCheckpoint> Value; ui64 EstimatedSize; TValueDesc(NUdf::TUnboxedValue&& value, ui64 size) @@ -20,6 +20,12 @@ class TDqAsyncOutputBuffer : public IDqAsyncOutputBuffer { { } + TValueDesc(NDqProto::TWatermark&& watermark, ui64 size) + : Value(std::move(watermark)) + , EstimatedSize(size) + { + } + TValueDesc(NDqProto::TCheckpoint&& checkpoint, ui64 size) : Value(std::move(checkpoint)) , EstimatedSize(size) @@ -61,6 +67,14 @@ public: ReportChunkIn(); } + void Push(NDqProto::TWatermark&& watermark) override { + const ui64 bytesSize = watermark.ByteSize(); + Values.emplace_back(std::move(watermark), bytesSize); + EstimatedStoredBytes += bytesSize; + + ReportChunkIn(); + } + void Push(NDqProto::TCheckpoint&& checkpoint) override { const ui64 bytesSize = checkpoint.ByteSize(); Values.emplace_back(std::move(checkpoint), bytesSize); @@ -105,6 +119,21 @@ public: return usedBytes; } + bool Pop(NDqProto::TWatermark& watermark) override { + if (!Values.empty() && std::holds_alternative<NDqProto::TWatermark>(Values.front().Value)) { + watermark = std::move(std::get<NDqProto::TWatermark>(Values.front().Value)); + const auto size = Values.front().EstimatedSize; + Y_VERIFY(EstimatedStoredBytes >= size); + EstimatedStoredBytes -= size; + Values.pop_front(); + + ReportChunkOut(1, size); + + return true; + } + return false; + } + bool Pop(NDqProto::TCheckpoint& checkpoint) override { if (!Values.empty() && std::holds_alternative<NDqProto::TCheckpoint>(Values.front().Value)) { checkpoint = std::move(std::get<NDqProto::TCheckpoint>(Values.front().Value)); diff --git a/ydb/library/yql/dq/runtime/dq_async_output.h b/ydb/library/yql/dq/runtime/dq_async_output.h index 931957a7208..86e57b6e58c 100644 --- a/ydb/library/yql/dq/runtime/dq_async_output.h +++ b/ydb/library/yql/dq/runtime/dq_async_output.h @@ -33,6 +33,9 @@ public: // Pop data to send. Return estimated size of returned data. [[nodiscard]] virtual ui64 Pop(NKikimr::NMiniKQL::TUnboxedValueVector& batch, ui64 bytes) = 0; + // Pop watermark + [[nodiscard]] + virtual bool Pop(NDqProto::TWatermark& watermark) = 0; // Pop chechpoint. Checkpoints may be taken from sink even after it is finished. [[nodiscard]] virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0; diff --git a/ydb/library/yql/dq/runtime/dq_output.h b/ydb/library/yql/dq/runtime/dq_output.h index e793ff5231b..c0251410782 100644 --- a/ydb/library/yql/dq/runtime/dq_output.h +++ b/ydb/library/yql/dq/runtime/dq_output.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/minikql/mkql_node.h> #include <util/datetime/base.h> @@ -42,6 +43,7 @@ public: virtual bool IsFull() const = 0; // can throw TDqChannelStorageException virtual void Push(NUdf::TUnboxedValue&& value) = 0; + virtual void Push(NDqProto::TWatermark&& watermark) = 0; // Push checkpoint. Checkpoints may be pushed to channel even after it is finished. virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0; virtual void Finish() = 0; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 88d23c8b38e..dbc85682740 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -125,6 +125,11 @@ public: } } + void Push(NDqProto::TWatermark&& watermark) override { + YQL_ENSURE(!Watermark); + Watermark.ConstructInPlace(std::move(watermark)); + } + void Push(NDqProto::TCheckpoint&& checkpoint) override { YQL_ENSURE(!Checkpoint); Checkpoint.ConstructInPlace(std::move(checkpoint)); @@ -209,6 +214,16 @@ public: } [[nodiscard]] + bool Pop(NDqProto::TWatermark& watermark) override { + if (!HasData() && Watermark) { + watermark = std::move(*Watermark); + Watermark = Nothing(); + return true; + } + return false; + } + + [[nodiscard]] bool Pop(NDqProto::TCheckpoint& checkpoint) override { if (!HasData() && Checkpoint) { checkpoint = std::move(*Checkpoint); @@ -313,6 +328,7 @@ private: ui32 EstimatedRowBytes = 0; bool Finished = false; + TMaybe<NDqProto::TWatermark> Watermark; TMaybe<NDqProto::TCheckpoint> Checkpoint; }; @@ -549,6 +565,11 @@ public: #endif } + void Push(NDqProto::TWatermark&& watermark) override { + YQL_ENSURE(!Watermark); + Watermark.ConstructInPlace(std::move(watermark)); + } + void Push(NDqProto::TCheckpoint&& checkpoint) override { YQL_ENSURE(!Checkpoint); Checkpoint.ConstructInPlace(std::move(checkpoint)); @@ -673,6 +694,16 @@ public: } [[nodiscard]] + bool Pop(NDqProto::TWatermark& watermark) override { + if (!HasData() && Watermark) { + watermark = std::move(*Watermark); + Watermark = Nothing(); + return true; + } + return false; + } + + [[nodiscard]] bool Pop(NDqProto::TCheckpoint& checkpoint) override { if (!HasData() && Checkpoint) { checkpoint = std::move(*Checkpoint); @@ -830,6 +861,7 @@ private: bool Finished = false; + TMaybe<NDqProto::TWatermark> Watermark; TMaybe<NDqProto::TCheckpoint> Checkpoint; }; diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.h b/ydb/library/yql/dq/runtime/dq_output_channel.h index 0adee34614e..8c74f3b71b6 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.h +++ b/ydb/library/yql/dq/runtime/dq_output_channel.h @@ -52,6 +52,9 @@ public: // can throw TDqChannelStorageException [[nodiscard]] virtual bool Pop(NDqProto::TData& data, ui64 bytes) = 0; + // Pop watermark. + [[nodiscard]] + virtual bool Pop(NDqProto::TWatermark& watermark) = 0; // Pop chechpoint. Checkpoints may be taken from channel even after it is finished. [[nodiscard]] virtual bool Pop(NDqProto::TCheckpoint& checkpoint) = 0; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 0e36cd68d43..540db4aae17 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -684,10 +684,14 @@ public: return TaskHasEffects; } - void SetWatermark(TInstant time) { + void SetWatermarkIn(TInstant time) override { Watermark.WatermarkIn = std::move(time); } + const NKikimr::NMiniKQL::TWatermark& GetWatermark() const override { + return Watermark; + } + IDqInputChannel::TPtr GetInputChannel(ui64 channelId) override { auto ptr = InputChannels.FindPtr(channelId); YQL_ENSURE(ptr, "task: " << TaskId << " does not have input channelId: " << channelId); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index 710f4e5cf5a..9f24d82fc27 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -12,8 +12,9 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_pattern_cache.h> #include <ydb/library/yql/minikql/mkql_alloc.h> #include <ydb/library/yql/minikql/mkql_function_registry.h> -#include <ydb/library/yql/minikql/mkql_node.h> #include <ydb/library/yql/minikql/mkql_node_visitor.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_watermark.h> #include <library/cpp/monlib/metrics/histogram_collector.h> @@ -326,6 +327,9 @@ public: [[nodiscard]] virtual TString Save() const = 0; virtual void Load(TStringBuf in) = 0; + + virtual void SetWatermarkIn(TInstant time) = 0; + virtual const NKikimr::NMiniKQL::TWatermark& GetWatermark() const = 0; }; TIntrusivePtr<IDqTaskRunner> MakeDqTaskRunner(const TDqTaskRunnerContext& ctx, const TDqTaskRunnerSettings& settings, diff --git a/ydb/library/yql/dq/tasks/dq_tasks_graph.h b/ydb/library/yql/dq/tasks/dq_tasks_graph.h index 780f5009036..8cb87b01558 100644 --- a/ydb/library/yql/dq/tasks/dq_tasks_graph.h +++ b/ydb/library/yql/dq/tasks/dq_tasks_graph.h @@ -74,6 +74,7 @@ struct TChannel { ui32 DstInputIndex = 0; bool InMemory = true; NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; + NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; }; using TChannelList = TVector<ui64>; @@ -117,6 +118,7 @@ struct TTaskInput { TChannelList Channels; TMaybe<::google::protobuf::Any> SourceSettings; TString SourceType; + NYql::NDqProto::EWatermarksMode WatermarksMode = NYql::NDqProto::EWatermarksMode::WATERMARKS_MODE_DISABLED; TInputMeta Meta; TMaybe<TTransform> Transform; @@ -167,6 +169,7 @@ struct TTask { NActors::TActorId ComputeActorId; TTaskMeta Meta; NDqProto::ECheckpointingMode CheckpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; + NDqProto::EWatermarksMode WatermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; }; template <class TStageInfoMeta, class TTaskMeta, class TInputMeta, class TOutputMeta> diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp index 585507da050..47df2509ce7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.cpp @@ -44,11 +44,11 @@ public: ui64 intervalHopCount, ui64 delayHopCount, bool dataWatermarks, + bool watermarkMode, TComputationContext& ctx, const THashFunc& hash, const TEqualsFunc& equal, - TWatermark& watermark, - bool watermarkMode) + TWatermark& watermark) : TBase(memInfo) , Stream(std::move(stream)) , Self(self) @@ -60,7 +60,7 @@ public: , Watermark(watermark) , WatermarkMode(watermarkMode) { - if (dataWatermarks) { + if (!watermarkMode && dataWatermarks) { DataWatermarkTracker.emplace(TWatermarkTracker(delayHopCount * hopTime, hopTime)); } } @@ -238,11 +238,11 @@ public: auto& keyState = GetOrCreateKeyState(key, WatermarkMode ? GetWatermark().MicroSeconds() / HopTime : hopIndex); if (hopIndex < keyState.HopIndex) { - ++EarlyEventsThrown; + ++LateEventsThrown; continue; } if (WatermarkMode && (hopIndex >= keyState.HopIndex + DelayHopCount + IntervalHopCount)) { - ++LateEventsThrown; + ++EarlyEventsThrown; continue; } @@ -429,10 +429,10 @@ public: IComputationNode* interval, IComputationNode* delay, IComputationNode* dataWatermarks, + IComputationNode* watermarkMode, TType* keyType, TType* stateType, - TWatermark& watermark, - bool watermarkMode) + TWatermark& watermark) : TBaseComputation(mutables) , Stream(stream) , Item(item) @@ -454,6 +454,7 @@ public: , Interval(interval) , Delay(delay) , DataWatermarks(dataWatermarks) + , WatermarkMode(watermarkMode) , KeyType(keyType) , StateType(stateType) , KeyPacker(mutables) @@ -462,7 +463,6 @@ public: , IsTuple(false) , UseIHash(false) , Watermark(watermark) - , WatermarkMode(watermarkMode) { Stateless = false; bool encoded; @@ -475,6 +475,7 @@ public: const auto interval = Interval->GetValue(ctx).Get<i64>(); const auto delay = Delay->GetValue(ctx).Get<i64>(); const auto dataWatermarks = DataWatermarks->GetValue(ctx).Get<bool>(); + const auto watermarkMode = WatermarkMode->GetValue(ctx).Get<bool>(); // TODO: move checks from here MKQL_ENSURE(hopTime > 0, "hop must be positive"); @@ -489,10 +490,10 @@ public: return ctx.HolderFactory.Create<TStreamValue>(Stream->GetValue(ctx), this, (ui64)hopTime, (ui64)intervalHopCount, (ui64)delayHopCount, - dataWatermarks, ctx, + dataWatermarks, watermarkMode, ctx, TValueHasher(KeyTypes, IsTuple, UseIHash ? MakeHashImpl(KeyType) : nullptr), TValueEqual(KeyTypes, IsTuple, UseIHash ? MakeEquateImpl(KeyType) : nullptr), - Watermark, WatermarkMode); + Watermark); } NUdf::TUnboxedValue GetValue(TComputationContext& compCtx) const override { @@ -532,6 +533,7 @@ private: DependsOn(Interval); DependsOn(Delay); DependsOn(DataWatermarks); + DependsOn(WatermarkMode); } IComputationNode* const Stream; @@ -557,6 +559,7 @@ private: IComputationNode* const Interval; IComputationNode* const Delay; IComputationNode* const DataWatermarks; + IComputationNode* const WatermarkMode; TType* const KeyType; TType* const StateType; @@ -567,13 +570,12 @@ private: bool IsTuple; bool UseIHash; TWatermark& Watermark; - bool WatermarkMode; }; } -IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode) { - MKQL_ENSURE(callable.GetInputsCount() == 20, "Expected 20 args"); +IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark) { + MKQL_ENSURE(callable.GetInputsCount() == 21, "Expected 21 args"); auto hasSaveLoad = !callable.GetInput(12).GetStaticType()->IsVoid(); @@ -604,6 +606,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo auto interval = LocateNode(ctx.NodeLocator, callable, 17); auto delay = LocateNode(ctx.NodeLocator, callable, 18); auto dataWatermarks = LocateNode(ctx.NodeLocator, callable, 19); + auto watermarkMode = LocateNode(ctx.NodeLocator, callable, 20); auto item = LocateExternalNode(ctx.NodeLocator, callable, 1); auto key = LocateExternalNode(ctx.NodeLocator, callable, 2); @@ -620,7 +623,7 @@ IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNo return new TMultiHoppingCoreWrapper(ctx.Mutables, stream, item, key, state, state2, time, inSave, inLoad, keyExtract, outTime, outInit, outUpdate, outSave, outLoad, outMerge, outFinish, - hop, interval, delay, dataWatermarks, keyType, stateType, watermark, watermarkMode); + hop, interval, delay, dataWatermarks, watermarkMode, keyType, stateType, watermark); } } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h index 02286aab567..603ea9207ae 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_multihopping.h @@ -1,16 +1,12 @@ #pragma once #include <ydb/library/yql/minikql/computation/mkql_computation_node.h> +#include <ydb/library/yql/minikql/mkql_watermark.h> namespace NKikimr { namespace NMiniKQL { - -struct TWatermark { - TInstant WatermarkIn; -}; - -IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark, bool watermarkMode = false); +IComputationNode* WrapMultiHoppingCore(TCallable& callable, const TComputationNodeFactoryContext& ctx, TWatermark& watermark); } } diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp index 8bc4a67c2cf..7959e5e601a 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_saveload_ut.cpp @@ -175,7 +175,8 @@ namespace { pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay - pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks + pgmBuilder.NewDataLiteral<bool>(dataWatermarks), // dataWatermarks + pgmBuilder.NewDataLiteral<bool>(false) ); auto graph = setup.BuildGraph(pgmReturn, {streamNode}); diff --git a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp index 27517269fe4..61a1ea44aea 100644 --- a/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp +++ b/ydb/library/yql/minikql/comp_nodes/ut/mkql_multihopping_ut.cpp @@ -64,12 +64,12 @@ namespace { return CreateDeterministicTimeProvider(10000000); } - TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark, bool watermarkMode = false) { - return [&watermark, watermarkMode](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + TComputationNodeFactory GetAuxCallableFactory(TWatermark& watermark) { + return [&watermark](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { if (callable.GetType()->GetName() == "MyStream") { return new TExternalComputationNode(ctx.Mutables); } else if (callable.GetType()->GetName() == "MultiHoppingCore") { - return WrapMultiHoppingCore(callable, ctx, watermark, watermarkMode); + return WrapMultiHoppingCore(callable, ctx, watermark); } return GetBuiltinFactory()(callable, ctx); @@ -92,7 +92,7 @@ namespace { THolder<IComputationGraph> BuildGraph(TRuntimeNode pgm, const std::vector<TNode*>& entryPoints = std::vector<TNode*>()) { Explorer.Walk(pgm.GetNode(), *Env); - TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark, WatermarkMode), + TComputationPatternOpts opts(Alloc.Ref(), *Env, GetAuxCallableFactory(Watermark), FunctionRegistry.Get(), NUdf::EValidateMode::None, NUdf::EValidatePolicy::Fail, "OFF", EGraphPerProcess::Multi, StatsResgistry.Get()); @@ -217,7 +217,8 @@ namespace { pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&hop, sizeof(hop))), // hop pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&interval, sizeof(interval))), // interval pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Interval>(NUdf::TStringRef((const char*)&delay, sizeof(delay))), // delay - pgmBuilder.NewDataLiteral<bool>(dataWatermarks) // dataWatermarks + pgmBuilder.NewDataLiteral<bool>(dataWatermarks), + pgmBuilder.NewDataLiteral<bool>(setup.WatermarkMode) ); auto graph = setup.BuildGraph(pgmReturn, {streamNode}); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 4dfedcc84cd..3fcadf95362 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -4759,7 +4759,8 @@ TRuntimeNode TProgramBuilder::MultiHoppingCore(TRuntimeNode list, const TUnaryLambda& load, const TBinaryLambda& merge, const TTernaryLambda& finish, - TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, TRuntimeNode dataWatermarks) + TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, + TRuntimeNode dataWatermarks, TRuntimeNode watermarksMode) { if constexpr (RuntimeVersion < 22U) { THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; @@ -4829,6 +4830,7 @@ TRuntimeNode TProgramBuilder::MultiHoppingCore(TRuntimeNode list, callableBuilder.Add(interval); callableBuilder.Add(delay); callableBuilder.Add(dataWatermarks); + callableBuilder.Add(watermarksMode); return TRuntimeNode(callableBuilder.Build(), false); } diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 72112c9d1e5..d514255b2d0 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -441,7 +441,8 @@ public: const TUnaryLambda& load, const TBinaryLambda& merge, const TTernaryLambda& finish, - TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, TRuntimeNode dataWatermarks); + TRuntimeNode hop, TRuntimeNode interval, TRuntimeNode delay, + TRuntimeNode dataWatermarks, TRuntimeNode watermarksMode); TRuntimeNode Chopper(TRuntimeNode flow, const TUnaryLambda& keyExtractor, const TBinaryLambda& groupSwitch, const TBinaryLambda& groupHandler); diff --git a/ydb/library/yql/minikql/mkql_watermark.h b/ydb/library/yql/minikql/mkql_watermark.h new file mode 100644 index 00000000000..8a5805bd536 --- /dev/null +++ b/ydb/library/yql/minikql/mkql_watermark.h @@ -0,0 +1,12 @@ +#pragma once + +#include "util/datetime/base.h" + +namespace NKikimr { +namespace NMiniKQL { + +struct TWatermark { + TInstant WatermarkIn; +}; + +}} diff --git a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp index aaebb97f036..94c558d868d 100644 --- a/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp +++ b/ydb/library/yql/providers/clickhouse/actors/yql_ch_read_actor.cpp @@ -91,7 +91,7 @@ private: } } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 /*freeSpace*/) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 /*freeSpace*/) final { if (Result) { const auto size = Result->size(); buffer.emplace_back(NKikimr::NMiniKQL::MakeString(std::string_view(*Result))); diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index ce81e578174..2025958d96c 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -1648,9 +1648,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return MkqlBuildLambda(*node.Child(12), ctx, {key, state, time}); }; + const auto watermarksMode = ctx.ProgramBuilder.NewDataLiteral(FromString<bool>(*node.Child(13), NUdf::EDataSlot::Bool)); + return ctx.ProgramBuilder.MultiHoppingCore( stream, keyExtractor, timeExtractor, init, update, save, load, merge, finish, - hop, interval, delay, dataWatermarks); + hop, interval, delay, dataWatermarks, watermarksMode); }); AddCallable("ToDict", [](const TExprNode& node, TMkqlBuildContext& ctx) { diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp index f036dee78f4..3d680abf16e 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.cpp @@ -91,8 +91,12 @@ TFakeCASetup::TFakeCASetup() NActors::TMailboxType::Simple, 0)); + Runtime->SetLogBackend(CreateStderrBackend()); + TAutoPtr<NKikimr::TAppPrepare> app = new NKikimr::TAppPrepare(); Runtime->Initialize(app->Unwrap()); + + Runtime->SetLogPriority(NKikimrServices::KQP_COMPUTE, NActors::NLog::EPriority::PRI_TRACE); } TFakeCASetup::~TFakeCASetup() { diff --git a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h index 12a1af40af1..a4f6c8ea9e5 100644 --- a/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h +++ b/ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h @@ -182,44 +182,64 @@ struct TFakeCASetup { ~TFakeCASetup(); template<typename T> - std::vector<T> AsyncInputRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) { - std::vector<T> result; - Execute([&result, &parser, freeSpace](TFakeActor& actor) { + std::vector<std::variant<T, TInstant>> AsyncInputRead( + const TReadValueParser<T> parser, + NThreading::TFuture<void>& nextDataFutureOut, + i64 freeSpace = 12345) + { + std::vector<std::variant<T, TInstant>> result; + NThreading::TFuture<bool> nextDataFuture; + Execute([&result, &parser, freeSpace, &nextDataFutureOut, this](TFakeActor& actor) { + TMaybe<TInstant> watermark; NKikimr::NMiniKQL::TUnboxedValueVector buffer; bool finished = false; - actor.DqAsyncInput->GetAsyncInputData(buffer, finished, freeSpace); + actor.DqAsyncInput->GetAsyncInputData(buffer, watermark, finished, freeSpace); for (const auto& uv : buffer) { for (const auto item : parser(uv)) { result.emplace_back(item); } } + + if (watermark) { + result.emplace_back(*watermark); + } + + nextDataFutureOut = AsyncInputPromises.NewAsyncInputDataArrived.GetFuture(); }); return result; } template<typename T> - std::vector<T> AsyncInputReadUntil( + std::vector<std::variant<T, TInstant>> AsyncInputReadUntil( const TReadValueParser<T> parser, ui64 size, i64 eachReadFreeSpace = 1000, - TDuration timeout = TDuration::Seconds(10)) + TDuration timeout = TDuration::Seconds(30)) { - std::vector<T> result; + std::vector<std::variant<T, TInstant>> result; + TInstant startedAt = TInstant::Now(); DoWithRetry([&](){ - auto batch = AsyncInputRead<T>(parser, eachReadFreeSpace); + NThreading::TFuture<void> nextDataFuture; + auto batch = AsyncInputRead<T>(parser, nextDataFuture, eachReadFreeSpace); for (const auto& item : batch) { result.emplace_back(item); } + if (TInstant::Now() > startedAt + timeout) { + return; + } + if (result.size() < size) { - AsyncInputPromises.NewAsyncInputDataArrived.GetFuture().Wait(timeout); + nextDataFuture.Wait(timeout); ythrow yexception() << "Not enough data"; } }, - TRetryOptions(3), - false); + TRetryOptions(std::numeric_limits<ui32>::max()), + true); + + UNIT_ASSERT_EQUAL_C(result.size(), size, "Waited for " << size << " items but only " << result.size() << " received"); return result; } diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index f012a43ed10..d22597522dd 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -265,7 +265,7 @@ private: void OnFailure(TEvDqFailure::TPtr& ev, const NActors::TActorContext&) { if (!Finished) { YQL_LOG_CTX_ROOT_SCOPE(TraceId); - YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ + YQL_CLOG(DEBUG, ProviderDq) << __FUNCTION__ << ", status=" << static_cast<int>(ev->Get()->Record.GetStatusCode()) << ", issues size=" << ev->Get()->Record.IssuesSize() << ", sender=" << ev->Sender; diff --git a/ydb/library/yql/providers/dq/actors/worker_actor.cpp b/ydb/library/yql/providers/dq/actors/worker_actor.cpp index 69416229bc8..1893f63e41c 100644 --- a/ydb/library/yql/providers/dq/actors/worker_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/worker_actor.cpp @@ -268,6 +268,7 @@ private: .InputDesc = input, .InputIndex = static_cast<ui64>(inputId), .TxId = TraceId, + .TaskId = Task.GetId(), .SecureParams = secureParams, .TaskParams = taskParams, .ComputeActorId = SelfId(), @@ -298,6 +299,7 @@ private: .OutputDesc = output, .OutputIndex = static_cast<ui64>(outputId), .TxId = TraceId, + .TaskId = Task.GetId(), .Callback = this, .SecureParams = secureParams, .TaskParams = taskParams, @@ -556,9 +558,10 @@ private: continue; } auto guard = source.TypeEnv->BindAllocator(); + TMaybe<TInstant> watermark; NKikimr::NMiniKQL::TUnboxedValueVector batch; bool finished = false; - const i64 space = source.Source->GetAsyncInputData(batch, finished, freeSpace); + const i64 space = source.Source->GetAsyncInputData(batch, watermark, finished, freeSpace); const ui64 index = inputIndex; if (space <= 0) { continue; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index 25582f97af6..110b78eb312 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -48,6 +48,8 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, WorkerFilter); REGISTER_SETTING(*this, _EnablePrecompute); REGISTER_SETTING(*this, EnableDqReplicate); + REGISTER_SETTING(*this, WatermarksMode); + REGISTER_SETTING(*this, WatermarksGranularityMs); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index cf12ecf9115..dd4b1b35077 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -72,7 +72,8 @@ struct TDqSettings { NCommon::TConfSetting<bool, false> _EnablePrecompute; NCommon::TConfSetting<bool, false> EnableDqReplicate; NCommon::TConfSetting<bool, false> _EnableGraceJoin; - + NCommon::TConfSetting<TString, false> WatermarksMode; + NCommon::TConfSetting<ui64, false> WatermarksGranularityMs; NCommon::TConfSetting<TString, false> WorkerFilter; @@ -112,6 +113,8 @@ struct TDqSettings { SAVE_SETTING(_FallbackOnRuntimeErrors); SAVE_SETTING(WorkerFilter); SAVE_SETTING(ComputeActorType); + SAVE_SETTING(WatermarksMode); + SAVE_SETTING(WatermarksGranularityMs); #undef SAVE_SETTING } diff --git a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp index a29b31cb668..c60a2290c5d 100644 --- a/ydb/library/yql/providers/dq/opt/logical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/logical_optimize.cpp @@ -171,6 +171,10 @@ private: const auto loadLambda = BuildLoadHopLambda(aggregate, ctx); const auto mergeLambda = BuildMergeHopLambda(aggregate, ctx); const auto finishLambda = BuildFinishHopLambda(aggregate, ctx); + const auto watermarkMode = BuildWatermarkMode(aggregate, ctx); + if (!watermarkMode) { + return nullptr; + } const auto streamArg = Build<TCoArgument>(ctx, pos).Name("stream").Done(); auto multiHoppingCoreBuilder = Build<TCoMultiHoppingCore>(ctx, pos) @@ -185,7 +189,8 @@ private: .MergeHandler(mergeLambda) .FinishHandler(finishLambda) .SaveHandler(saveLambda) - .LoadHandler(loadLambda); + .LoadHandler(loadLambda) + .WatermarkMode(*watermarkMode); if (Config->AnalyticsHopping.Get().GetOrElse(false)) { return Build<TCoPartitionsByKeys>(ctx, node.Pos()) @@ -781,6 +786,17 @@ private: .Ptr(); } + TMaybe<TExprNode::TPtr> BuildWatermarkMode(const TCoAggregate& aggregate, TExprContext& ctx) { + const auto analyticsMode = Config->AnalyticsHopping.Get().GetOrElse(false); + const bool enableWatermarks = !analyticsMode && Config->WatermarksMode.Get() == "default"; + if (enableWatermarks && Config->ComputeActorType.Get() != "async") { + ctx.AddError(TIssue(ctx.GetPosition(aggregate.Pos()), "Watermarks should be used only with async compute actor")); + return Nothing(); + } + + return ctx.NewAtom(aggregate.Pos(), ToString(enableWatermarks)); + } + private: TDqConfiguration::TPtr Config; TTypeAnnotationContext& TypesCtx; diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index 06c81121921..bc195decbc5 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -259,7 +259,7 @@ namespace NYql::NDqs { SourceTaskID = resultTask.Id; } - BuildCheckpointingMode(); + BuildCheckpointingAndWatermarksMode(true, settings->WatermarksMode.Get().GetOrElse("") == "default"); return TasksGraph.GetTasks().size(); } @@ -279,7 +279,7 @@ namespace NYql::NDqs { return sourceType == "PqSource"; // Now it is the only infinite source type. Others are finite. } - void TDqsExecutionPlanner::BuildCheckpointingMode() { + void TDqsExecutionPlanner::BuildCheckpointingAndWatermarksMode(bool enableCheckpoints, bool enableWatermarks) { std::stack<TDqsTasksGraph::TTaskType*> tasksStack; std::vector<bool> processedTasks(TasksGraph.GetTasks().size()); for (TDqsTasksGraph::TTaskType& task : TasksGraph.GetTasks()) { @@ -312,33 +312,62 @@ namespace NYql::NDqs { continue; } - // Current task has all inputs processed, so determine its checkpointing mode now. + // Current task has all inputs processed, so determine its checkpointing and watermarks mode now. NDqProto::ECheckpointingMode checkpointingMode = NDqProto::CHECKPOINTING_MODE_DISABLED; - for (const auto& input : task.Inputs) { - if (input.SourceType) { - if (IsInfiniteSourceType(input.SourceType)) { - checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; - break; - } - } else { - for (ui64 channelId : input.Channels) { - const NDq::TChannel& channel = TasksGraph.GetChannel(channelId); - if (channel.CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED) { + if (enableCheckpoints) { + for (const auto& input : task.Inputs) { + if (input.SourceType) { + if (IsInfiniteSourceType(input.SourceType)) { checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; break; } + } else { + for (ui64 channelId : input.Channels) { + const NDq::TChannel& channel = TasksGraph.GetChannel(channelId); + if (channel.CheckpointingMode != NDqProto::CHECKPOINTING_MODE_DISABLED) { + checkpointingMode = NDqProto::CHECKPOINTING_MODE_DEFAULT; + break; + } + } + if (checkpointingMode == NDqProto::CHECKPOINTING_MODE_DEFAULT) { + break; + } } - if (checkpointingMode == NDqProto::CHECKPOINTING_MODE_DEFAULT) { - break; + } + } + + NDqProto::EWatermarksMode watermarksMode = NDqProto::WATERMARKS_MODE_DISABLED; + if (enableWatermarks) { + for (auto& input : task.Inputs) { + if (input.SourceType) { + if (IsInfiniteSourceType(input.SourceType)) { + watermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT; + input.WatermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT; + break; + } + } else { + for (ui64 channelId : input.Channels) { + const NDq::TChannel& channel = TasksGraph.GetChannel(channelId); + if (channel.WatermarksMode != NDqProto::WATERMARKS_MODE_DEFAULT) { + watermarksMode = NDqProto::WATERMARKS_MODE_DEFAULT; + break; + } + } + if (watermarksMode == NDqProto::WATERMARKS_MODE_DEFAULT) { + break; + } } } } // Apply mode to task and its outputs. task.CheckpointingMode = checkpointingMode; + task.WatermarksMode = watermarksMode; for (const auto& output : task.Outputs) { for (ui64 channelId : output.Channels) { - TasksGraph.GetChannel(channelId).CheckpointingMode = checkpointingMode; + auto& channel = TasksGraph.GetChannel(channelId); + channel.CheckpointingMode = checkpointingMode; + channel.WatermarksMode = watermarksMode; } } @@ -398,6 +427,7 @@ namespace NYql::NDqs { auto* sourceProto = inputDesc.MutableSource(); *sourceProto->MutableSettings() = *input.SourceSettings; sourceProto->SetType(input.SourceType); + sourceProto->SetWatermarksMode(input.WatermarksMode); } else { FillInputDesc(inputDesc, input); } diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.h b/ydb/library/yql/providers/dq/planner/execution_planner.h index d647665c756..4049cff1a57 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.h +++ b/ydb/library/yql/providers/dq/planner/execution_planner.h @@ -72,7 +72,7 @@ namespace NYql::NDqs { void FillOutputDesc(NDqProto::TTaskOutput& outputDesc, const TTaskOutput& output); void GatherPhyMapping(THashMap<std::tuple<TString, TString>, TString>& clusters, THashMap<std::tuple<TString, TString, TString>, TString>& tables); - void BuildCheckpointingMode(); + void BuildCheckpointingAndWatermarksMode(bool enableCheckpoints, bool enableWatermarks); bool IsEgressTask(const TDqsTasksGraph::TTaskType& task) const; private: diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index 23a9eca9ff4..56c64e0fe07 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -913,6 +913,11 @@ public: ythrow yexception() << "unimplemented"; } + void Push(NDqProto::TWatermark&& watermark) override { + Y_UNUSED(watermark); + ythrow yexception() << "unimplemented"; + } + void Push(NDqProto::TCheckpoint&& checkpoint) override { Y_UNUSED(checkpoint); ythrow yexception() << "unimplemented"; @@ -956,6 +961,10 @@ public: } } + bool Pop(NDqProto::TWatermark&) override { + return false; + } + bool Pop(NDqProto::TCheckpoint&) override { return false; } @@ -1179,6 +1188,11 @@ public: Y_FAIL("Unimplemented"); } + bool Pop(NDqProto::TWatermark& watermark) override { + Y_UNUSED(watermark); + Y_FAIL("Watermarks are not supported"); + } + bool Pop(NDqProto::TCheckpoint& checkpoint) override { Y_UNUSED(checkpoint); Y_FAIL("Checkpoints are not supported"); @@ -1193,6 +1207,11 @@ public: Y_FAIL("Unimplemented"); } + void Push(NDqProto::TWatermark&& watermark) override { + Y_UNUSED(watermark); + Y_FAIL("Watermarks are not supported"); + } + void Push(NDqProto::TCheckpoint&& checkpoint) override { Y_UNUSED(checkpoint); Y_FAIL("Checkpoints are not supported"); @@ -1598,6 +1617,14 @@ public: ythrow yexception() << "unimplemented"; } + void SetWatermarkIn(TInstant) override { + ythrow yexception() << "unimplemented"; + } + + const NKikimr::NMiniKQL::TWatermark& GetWatermark() const override { + ythrow yexception() << "unimplemented"; + } + private: TIntrusivePtr<TTaskRunner> Delegate; NDqProto::TDqTask Task; diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index 96532f7ba84..b5ef754dd18 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -345,6 +345,7 @@ private: channelId, std::move(chunks), Nothing(), + Nothing(), isFinished, changed, GetSensors(response), diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp index 2b4c8dcb5a8..c7507a91e01 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.cpp @@ -15,33 +15,55 @@ namespace { const std::unordered_map<TString, NYql::NDq::TPqMetaExtractor::TPqMetaExtractorLambda> ExtractorsMap = { { "_yql_sys_create_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetCreateTime().MicroSeconds())); + using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>; + return std::make_pair( + NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetCreateTime().MicroSeconds())), + NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); } }, { "_yql_sys_write_time", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetWriteTime().MicroSeconds())); + using TDataType = NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>; + return std::make_pair( + NYql::NUdf::TUnboxedValuePod(static_cast<TDataType::TLayout>(message.GetWriteTime().MicroSeconds())), + NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); } }, { "_yql_sys_partition_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - return NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId()); + using TDataType = NYql::NUdf::TDataType<ui64>; + return std::make_pair( + NYql::NUdf::TUnboxedValuePod(message.GetPartitionStream()->GetPartitionId()), + NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); } }, { "_yql_sys_offset", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - return NYql::NUdf::TUnboxedValuePod(message.GetOffset()); + using TDataType = NYql::NUdf::TDataType<ui64>; + return std::make_pair( + NYql::NUdf::TUnboxedValuePod(message.GetOffset()), + NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize); } }, { "_yql_sys_message_group_id", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ const auto& data = message.GetMessageGroupId(); - return NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size())); + return std::make_pair( + NKikimr::NMiniKQL::MakeString(NYql::NUdf::TStringRef(data.Data(), data.Size())), + data.Size() + ); } }, { "_yql_sys_seq_no", [](const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message){ - return NYql::NUdf::TUnboxedValuePod(static_cast<NYql::NUdf::TDataType<NYql::NUdf::TTimestamp>::TLayout>(message.GetSeqNo())); + using TDataType = NYql::NUdf::TDataType<ui64>; + return std::make_pair( + NYql::NUdf::TUnboxedValuePod(message.GetSeqNo()), + NYql::NUdf::GetDataTypeInfo(TDataType::Slot).FixedSize + ); } }, }; diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h index df932bc48f2..d9207757630 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_meta_extractor.h @@ -12,7 +12,7 @@ namespace NYql::NDq { struct TPqMetaExtractor { - using TPqMetaExtractorLambda = std::function<NYql::NUdf::TUnboxedValuePod(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>; + using TPqMetaExtractorLambda = std::function<std::pair<NYql::NUdf::TUnboxedValuePod, i64>(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage&)>; public: TPqMetaExtractor(); diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp index 2c2605d3264..0b25a022da6 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp @@ -3,6 +3,7 @@ #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io_factory.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> +#include <ydb/library/yql/dq/actors/compute/dq_source_watermark_tracker.h> #include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/common/dq_common.h> #include <ydb/library/yql/dq/proto/dq_checkpoint.pb.h> @@ -94,6 +95,7 @@ public: TDqPqReadActor( ui64 inputIndex, const TTxId& txId, + ui64 taskId, const THolderFactory& holderFactory, NPq::NProto::TDqPqTopicSource&& sourceParams, NPq::NProto::TDqReadTaskParams&& readParams, @@ -108,7 +110,7 @@ public: , BufferSize(bufferSize) , RangesMode(rangesMode) , HolderFactory(holderFactory) - , LogPrefix(TStringBuilder() << "TxId: " << TxId << ", PQ source. ") + , LogPrefix(TStringBuilder() << "SelfId: " << this->SelfId() << ", TxId: " << TxId << ", task: " << taskId << ". PQ source. ") , Driver(std::move(driver)) , CredentialsProviderFactory(std::move(credentialsProviderFactory)) , SourceParams(std::move(sourceParams)) @@ -121,6 +123,8 @@ public: for (const auto& fieldName : SourceParams.GetMetadataFields()) { MetadataFields.emplace_back(fieldName, fieldsExtractor.FindExtractorLambda(fieldName)); } + + InitWatermarkTracker(); } NYdb::NPersQueue::TPersQueueClientSettings GetPersQueueClientSettings() const { @@ -190,6 +194,8 @@ public: } } StartingMessageTimestamp = minStartingMessageTs; + InitWatermarkTracker(); + if (ReadSession) { ReadSession.reset(); GetReadSession(); @@ -228,12 +234,16 @@ private: ) void Handle(TEvPrivate::TEvSourceDataReady::TPtr&, const TActorContext& ctx) { + SRC_LOG_T("Source data ready"); SubscribedOnEvent = false; ctx.Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } // IActor & IDqComputeActorAsyncInput void PassAway() override { // Is called from Compute Actor + std::queue<TReadyBatch> empty; + ReadyBuffer.swap(empty); + if (ReadSession) { ReadSession->Close(TDuration::Zero()); ReadSession.reset(); @@ -242,38 +252,71 @@ private: TActor<TDqPqReadActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool&, i64 freeSpace) override { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override { + SRC_LOG_T("GetAsyncInputData freeSpace = " << freeSpace); + + i64 usedSpace = 0; + if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { + return usedSpace; + } + auto events = GetReadSession().GetEvents(false, TMaybe<size_t>(), static_cast<size_t>(Max<i64>(freeSpace, 0))); - ui32 batchSize = 0; + ui32 batchItemsEstimatedCount = 0; for (auto& event : events) { if (const auto* val = std::get_if<NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent>(&event)) { - batchSize += val->GetMessages().size(); + batchItemsEstimatedCount += val->GetMessages().size(); } } - buffer.clear(); - buffer.reserve(batchSize); - i64 usedSpace = 0; for (auto& event : events) { - std::visit(TPQEventProcessor{*this, buffer, usedSpace, LogPrefix}, event); + std::visit(TPQEventProcessor{*this, batchItemsEstimatedCount, LogPrefix}, event); } - SubscribeOnNextEvent(); + if (MaybeReturnReadyBatch(buffer, watermark, usedSpace)) { + return usedSpace; + } - return usedSpace; + watermark = Nothing(); + buffer.clear(); + return 0; } private: - NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const { - NYdb::NPersQueue::TTopicReadSettings topicReadSettings; - topicReadSettings.Path(SourceParams.GetTopicPath()); + std::vector<ui64> GetPartitionsToRead() const { + std::vector<ui64> res; + ui64 currentPartition = ReadParams.GetPartitioningParams().GetEachTopicPartitionGroupId(); do { - topicReadSettings.AppendPartitionGroupIds(currentPartition + 1); // 1-based. + res.emplace_back(currentPartition + 1); // 1-based. currentPartition += ReadParams.GetPartitioningParams().GetDqPartitionsCount(); } while (currentPartition < ReadParams.GetPartitioningParams().GetTopicPartitionsCount()); + return res; + } + + void InitWatermarkTracker() { + SRC_LOG_D("Watermarks enabled: " << SourceParams.GetWatermarks().GetEnabled() << " granularity: " + << SourceParams.GetWatermarks().GetGranularityUs() << " microseconds"); + + if (!SourceParams.GetWatermarks().GetEnabled()) { + return; + } + + WatermarkTracker = std::make_unique<TDqSourceWatermarkTracker<TPartitionKey>>( + TDuration::MicroSeconds(SourceParams.GetWatermarks().GetGranularityUs()), + StartingMessageTimestamp, + GetPartitionsToRead().size() // TODO: for the internal LB there is a problem here. See YQ-1384 + ); + } + + NYdb::NPersQueue::TReadSessionSettings GetReadSessionSettings() const { + NYdb::NPersQueue::TTopicReadSettings topicReadSettings; + topicReadSettings.Path(SourceParams.GetTopicPath()); + for (const auto partitionId : GetPartitionsToRead()) { + topicReadSettings.AppendPartitionGroupIds(partitionId); + } + return NYdb::NPersQueue::TReadSessionSettings() .DisableClusterDiscovery(SourceParams.GetClusterType() == NPq::NProto::DataStreams) .AppendTopics(topicReadSettings) @@ -283,18 +326,6 @@ private: .RangesMode(RangesMode); } - void UpdateStateWithNewReadData(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) { - if (event.GetMessages().empty()) { - return; - } - - assert(MaxElementBy(event.GetMessages(), [](const auto& message){ return message.GetOffset(); }) - ->GetOffset() == event.GetMessages().back().GetOffset()); - - const auto maxOffset = event.GetMessages().back().GetOffset(); - PartitionToOffset[MakePartitionKey(event.GetPartitionStream())] = maxOffset + 1; // Next offset to read from. - } - static TPartitionKey MakePartitionKey(const NYdb::NPersQueue::TPartitionStream::TPtr& partitionStreamPtr) { return std::make_pair(partitionStreamPtr->GetCluster(), partitionStreamPtr->GetPartitionId()); } @@ -309,8 +340,55 @@ private: } } + struct TReadyBatch { + public: + TReadyBatch(TMaybe<TInstant> watermark, ui32 dataCapacity) : Watermark(watermark){ + Data.reserve(dataCapacity); + } + + public: + TMaybe<TInstant> Watermark; + TUnboxedValueVector Data; + i64 UsedSpace = 0; + THashMap<NYdb::NPersQueue::TPartitionStream::TPtr, TList<std::pair<ui64, ui64>>> OffsetRanges; // [start, end) + }; + + bool MaybeReturnReadyBatch(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>& watermark, i64& usedSpace) { + if (ReadyBuffer.empty()) { + SubscribeOnNextEvent(); + return false; + } + + auto& readyBatch = ReadyBuffer.front(); + buffer.swap(readyBatch.Data); + watermark = readyBatch.Watermark; + usedSpace = readyBatch.UsedSpace; + + for (const auto& [partitionStream, ranges] : readyBatch.OffsetRanges) { + for (const auto& [start, end] : ranges) { + CurrentDeferredCommit.Add(partitionStream, start, end); + } + PartitionToOffset[MakePartitionKey(partitionStream)] = ranges.back().second; + } + + ReadyBuffer.pop(); + + if (ReadyBuffer.empty()) { + SubscribeOnNextEvent(); + } else { + Send(SelfId(), new TEvPrivate::TEvSourceDataReady()); + } + + SRC_LOG_T("Return ready batch." + << " DataCount = " << buffer.size() + << " Watermark = " << (watermark ? ToString(*watermark) : "none") + << " Used space = " << usedSpace); + return true; + } + struct TPQEventProcessor { void operator()(NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) { + const auto partitionKey = MakePartitionKey(event.GetPartitionStream()); for (const auto& message : event.GetMessages()) { const TString& data = message.GetData(); @@ -322,24 +400,19 @@ private: continue; } - NUdf::TUnboxedValuePod item; - if (Self.MetadataFields.empty()) { - item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); - } else { - NUdf::TUnboxedValue* itemPtr; - item = Self.HolderFactory.CreateDirectArrayHolder(Self.MetadataFields.size() + 1, itemPtr); - *(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); + auto [item, size] = CreateItem(message); - for (const auto& [name, extractor] : Self.MetadataFields) { - *(itemPtr++) = extractor(message); - } - } + auto& curBatch = GetActiveBatch(partitionKey, message.GetWriteTime()); + curBatch.Data.emplace_back(std::move(item)); + curBatch.UsedSpace += size; - Batch.emplace_back(item); - UsedSpace += data.Size(); + auto& offsets = curBatch.OffsetRanges[message.GetPartitionStream()]; + if (!offsets.empty() && offsets.back().second == message.GetOffset()) { + offsets.back().second = message.GetOffset() + 1; + } else { + offsets.emplace_back(message.GetOffset(), message.GetOffset() + 1); + } } - Self.UpdateStateWithNewReadData(event); - Self.CurrentDeferredCommit.Add(event); } void operator()(NYdb::NPersQueue::TSessionClosedEvent& ev) { @@ -366,9 +439,56 @@ private: void operator()(NYdb::NPersQueue::TReadSessionEvent::TPartitionStreamClosedEvent&) { } + TReadyBatch& GetActiveBatch(const TPartitionKey& partitionKey, TInstant time) { + if (Y_UNLIKELY(Self.ReadyBuffer.empty())) { + Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); + } + + TReadyBatch& activeBatch = Self.ReadyBuffer.back(); + + if (!Self.WatermarkTracker) { + // Watermark tracker disabled => there is no way more than one batch will be used + return activeBatch; + } + + const auto maybeNewWatermark = Self.WatermarkTracker->NotifyNewPartitionTime(partitionKey, time); + if (!maybeNewWatermark) { + // Watermark wasn't moved => use current active batch + return activeBatch; + } + + SRC_LOG_D("New watermark " << *maybeNewWatermark << " was generated"); + activeBatch.Watermark = maybeNewWatermark; // Write watermark to current batch + + return Self.ReadyBuffer.emplace(Nothing(), BatchCapacity); // And open new batch + } + + std::pair<NUdf::TUnboxedValuePod, i64> CreateItem(const NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent::TMessage& message) { + const TString& data = message.GetData(); + + i64 usedSpace = 0; + NUdf::TUnboxedValuePod item; + if (Self.MetadataFields.empty()) { + item = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); + usedSpace += data.Size(); + } else { + NUdf::TUnboxedValue* itemPtr; + item = Self.HolderFactory.CreateDirectArrayHolder(Self.MetadataFields.size() + 1, itemPtr); + *(itemPtr++) = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(data.Data(), data.Size())); + usedSpace += data.Size(); + + for (const auto& [name, extractor] : Self.MetadataFields) { + auto [ub, size] = extractor(message); + *(itemPtr++) = std::move(ub); + usedSpace += size; + } + } + + return std::make_pair(item, usedSpace); + } + TDqPqReadActor& Self; - TUnboxedValueVector& Batch; - i64& UsedSpace; + ui32 BatchCapacity; const TString& LogPrefix; }; @@ -393,12 +513,15 @@ private: NYdb::NPersQueue::TDeferredCommit CurrentDeferredCommit; bool SubscribedOnEvent = false; std::vector<std::tuple<TString, TPqMetaExtractor::TPqMetaExtractorLambda>> MetadataFields; + std::queue<TReadyBatch> ReadyBuffer; + std::unique_ptr<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker; }; std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( NPq::NProto::TDqPqTopicSource&& settings, ui64 inputIndex, TTxId txId, + ui64 taskId, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, NYdb::TDriver driver, @@ -422,6 +545,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( TDqPqReadActor* actor = new TDqPqReadActor( inputIndex, txId, + taskId, holderFactory, std::move(settings), std::move(readTaskParamsMsg), @@ -446,6 +570,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv std::move(settings), args.InputIndex, args.TxId, + args.TaskId, args.SecureParams, args.TaskParams, driver, diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h index bc3e5292fae..d8411fb04a5 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h @@ -26,6 +26,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor( NPq::NProto::TDqPqTopicSource&& settings, ui64 inputIndex, TTxId txId, + ui64 taskId, const THashMap<TString, TString>& secureParams, const THashMap<TString, TString>& taskParams, NYdb::TDriver driver, diff --git a/ydb/library/yql/providers/pq/common/yql_names.h b/ydb/library/yql/providers/pq/common/yql_names.h index e4ac47c5cd8..184a5d5eb0e 100644 --- a/ydb/library/yql/providers/pq/common/yql_names.h +++ b/ydb/library/yql/providers/pq/common/yql_names.h @@ -9,5 +9,6 @@ constexpr TStringBuf ConsumerSetting = "Consumer"; constexpr TStringBuf EndpointSetting = "Endpoint"; constexpr TStringBuf UseSslSetting = "UseSsl"; constexpr TStringBuf AddBearerToTokenSetting = "AddBearerToToken"; +constexpr TStringBuf WatermarksGranularityUsSetting = "WatermarksGranularityUs"; } // namespace NYql diff --git a/ydb/library/yql/providers/pq/proto/dq_io.proto b/ydb/library/yql/providers/pq/proto/dq_io.proto index c5c3ebe1d0f..5ade3091b42 100644 --- a/ydb/library/yql/providers/pq/proto/dq_io.proto +++ b/ydb/library/yql/providers/pq/proto/dq_io.proto @@ -14,6 +14,11 @@ enum EClusterType { DataStreams = 2; } +message TWatermarks { + bool Enabled = 1; + uint64 GranularityUs = 2; +} + message TDqPqTopicSource { string TopicPath = 1; string ConsumerName = 2; @@ -25,6 +30,7 @@ message TDqPqTopicSource { bool AddBearerToToken = 8; string DatabaseId = 9; repeated string MetadataFields = 10; + TWatermarks Watermarks = 11; } message TDqPqTopicSink { diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index e4620cb2a8e..5285e586d09 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -69,7 +69,7 @@ public: return 0; } - TExprNode::TPtr WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext& ctx) override { + TExprNode::TPtr WrapRead(const TDqSettings& dqSettings, const TExprNode::TPtr& read, TExprContext& ctx) override { if (const auto& maybePqReadTopic = TMaybeNode<TPqReadTopic>(read)) { const auto& pqReadTopic = maybePqReadTopic.Cast(); @@ -126,7 +126,7 @@ public: .Input<TDqPqTopicSource>() .Topic(pqReadTopic.Topic()) .Columns(std::move(columns)) - .Settings(BuildTopicReadSettings(clusterName, read->Pos(), ctx)) + .Settings(BuildTopicReadSettings(clusterName, dqSettings, read->Pos(), ctx)) .Token<TCoSecureParam>() .Name().Build(token) .Build() @@ -200,6 +200,9 @@ public: srcDesc.SetUseSsl(FromString<bool>(Value(setting))); } else if (name == AddBearerToTokenSetting) { srcDesc.SetAddBearerToToken(FromString<bool>(Value(setting))); + } else if (name == WatermarksGranularityUsSetting) { + srcDesc.MutableWatermarks()->SetEnabled(true); + srcDesc.MutableWatermarks()->SetGranularityUs(FromString<ui64>(Value(setting))); } } @@ -259,7 +262,12 @@ public: } } - NNodes::TCoNameValueTupleList BuildTopicReadSettings(const TString& cluster, TPositionHandle pos, TExprContext& ctx) const { + NNodes::TCoNameValueTupleList BuildTopicReadSettings( + const TString& cluster, + const TDqSettings& dqSettings, + TPositionHandle pos, + TExprContext& ctx) const + { TVector<TCoNameValueTuple> props; { @@ -283,6 +291,11 @@ public: Add(props, AddBearerToTokenSetting, "1", pos, ctx); } + if (dqSettings.WatermarksMode.Get().GetOrElse("") == "default") { + const auto granularity = TDuration::MilliSeconds(dqSettings.WatermarksGranularityMs.Get().GetOrElse(1000)); + Add(props, WatermarksGranularityUsSetting, ToString(granularity.MicroSeconds()), pos, ctx); + } + return Build<TCoNameValueTupleList>(ctx, pos) .Add(props) .Done(); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index a9ef106a6d6..51c63e42b60 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -227,7 +227,7 @@ private: } } - i64 GetAsyncInputData(TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { buffer.reserve(buffer.size() + Blocks.size()); @@ -680,7 +680,7 @@ private: void CommitState(const NDqProto::TCheckpoint&) final {} ui64 GetInputIndex() const final { return InputIndex; } - i64 GetAsyncInputData(TUnboxedValueVector& output, bool& finished, i64 free) final { + i64 GetAsyncInputData(TUnboxedValueVector& output, TMaybe<TInstant>&, bool& finished, i64 free) final { i64 total = 0LL; if (!Blocks.empty()) do { auto& block = std::get<NDB::Block>(Blocks.front()); diff --git a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp index ae7e78e0c7e..172483b1189 100644 --- a/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp +++ b/ydb/library/yql/providers/ydb/actors/yql_ydb_read_actor.cpp @@ -125,7 +125,7 @@ private: TActorBootstrapped<TYdbReadActor>::PassAway(); } - i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64 freeSpace) final { + i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, TMaybe<TInstant>&, bool& finished, i64 freeSpace) final { i64 total = 0LL; if (!Blocks.empty()) { buffer.reserve(buffer.size() + Blocks.size()); |