diff options
| author | Dmitry Kardymon <[email protected]> | 2026-05-14 18:50:50 +0300 |
|---|---|---|
| committer | GitHub <[email protected]> | 2026-05-14 15:50:50 +0000 |
| commit | d95de0baaf0791ef8a7764d498917b69ad503bf2 (patch) | |
| tree | 67e463ebc00183c7c04b72a66ff32cb95d395588 | |
| parent | 139172ebe216409dcd8ec4ef10b6ba077f16b2fd (diff) | |
YQ-5105 checkpoints/watermarks in channels 2.0 (#35734)
Co-authored-by: Pisarenko Grigoriy <[email protected]>
16 files changed, 181 insertions, 32 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 520b910c4e5..4feb691cc4c 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -157,6 +157,7 @@ public: , TasksGraph(Database, Request.Transactions, Request.TxAlloc, AggregationSettings, Counters, BufferActorId, UserToken) , ChannelService(channelService) , PartitionPruner(MakeHolder<TPartitionPruner>(Request.TxAlloc->HolderFactory, Request.TxAlloc->TypeEnv, std::move(partitionPrunerConfig))) + , EnableWatermarks(executerConfig.TableServiceConfig.GetEnableWatermarks()) { ArrayBufferMinFillPercentage = executerConfig.TableServiceConfig.GetArrayBufferMinFillPercentage(); BufferPageAllocSize = executerConfig.TableServiceConfig.GetBufferPageAllocSize(); @@ -1291,7 +1292,7 @@ protected: .BufferPageAllocSize = BufferPageAllocSize, .Query = Query, .CheckpointCoordinator = CheckpointCoordinatorId, - .EnableWatermarks = Request.QueryPhysicalGraph && Request.QueryPhysicalGraph->GetPreparedQuery().GetPhysicalQuery().GetEnableWatermarks(), + .EnableWatermarks = EnableWatermarks, }); auto err = Planner->PlanExecution(); @@ -1805,7 +1806,7 @@ protected: TKqpTasksGraph TasksGraph; std::shared_ptr<NYql::NDq::IDqChannelService> ChannelService; THolder<TPartitionPruner> PartitionPruner; - + bool EnableWatermarks = false; private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); }; diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp index 93f5629f98a..8ffc4d65ad4 100644 --- a/ydb/core/kqp/executer_actor/kqp_planner.cpp +++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp @@ -665,12 +665,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() { void TKqpPlanner::PrepareCheckpoints() { const auto isStreamingQuery = UserRequestContext && UserRequestContext->IsStreamingQuery; - - if (!isStreamingQuery) { - return; - } - - const auto enableCheckpoints = static_cast<bool>(CheckpointCoordinatorId); + const auto enableCheckpoints = isStreamingQuery && static_cast<bool>(CheckpointCoordinatorId); TasksGraph.BuildCheckpointingAndWatermarksMode(enableCheckpoints, EnableWatermarks); if (!enableCheckpoints) { diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp index e857e21c350..9a6df86ab16 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp @@ -40,7 +40,7 @@ NKikimrConfig::TAppConfig& TStreamingTestFixture::SetupAppConfig() { EnsureNotInitialized("AppConfig"); auto& result = AppConfig.emplace(); - result.MutableTableServiceConfig()->SetDqChannelVersion(1u); + result.MutableTableServiceConfig()->SetDqChannelVersion(2u); return result; } @@ -98,7 +98,7 @@ std::shared_ptr<TKikimrRunner> TStreamingTestFixture::GetKikimrRunner() { queryServiceConfig.SetEnableMatchRecognize(true); auto& tableServiceConfig = *AppConfig->MutableTableServiceConfig(); - tableServiceConfig.SetDqChannelVersion(1u); + tableServiceConfig.SetDqChannelVersion(2u); LogSettings .AddLogPriority(NKikimrServices::STREAMS_STORAGE_SERVICE, NLog::PRI_DEBUG) diff --git a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp index 786ad264012..7a717a6e6e7 100644 --- a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp @@ -1241,7 +1241,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) { CREATE STREAMING QUERY `{query_name}` AS DO BEGIN PRAGMA ydb.HashJoinMode = "map"; - PRAGMA ydb.DqChannelVersion = "1"; + PRAGMA ydb.DqChannelVersion = "2"; INSERT INTO `{pq_source}`.`{output_topic}` SELECT diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index f9addbc7422..b81a70b9dbd 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -746,6 +746,6 @@ message TKqpPhyQuery { optional bool ForceImmediateEffectsExecution = 15; uint32 LangVer = 16; optional bool DisableCheckpoints = 17; - optional bool EnableWatermarks = 18; + optional bool EnableWatermarks = 18 [deprecated=true]; optional EIsolationLevel DefaultTxMode = 19; } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp index 0342e8f41ff..f1e07513b97 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp @@ -448,6 +448,7 @@ void TDqComputeActorCheckpoints::DoCheckpoint() { if (SaveState()) { LOG_PCP_T("Injecting checkpoint barrier to outputs"); ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint); + ComputeActor->ResumeInputsByCheckpoint(); TryToSavePendingCheckpoint(); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 9f561a21245..ae40d3b4e0f 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1250,7 +1250,9 @@ protected: if (!Checkpoints) { Checkpoints = new TDqComputeActorCheckpoints(this->SelfId(), TxId, Task, this); Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints)); - Channels->SetCheckpointsSupport(); + if (Channels) { + Channels->SetCheckpointsSupport(); + } } TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release()); Checkpoints->Receive(handle); diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h index b5d4bc63ccc..eec3c834b5d 100644 --- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h +++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h @@ -3,11 +3,12 @@ #include "dq_compute_actor_impl.h" #include "dq_compute_actor_async_input_helper.h" #include <ydb/library/yql/dq/actors/spilling/spiller_factory.h> +#include <ydb/library/yql/dq/runtime/dq_input_channel.h> namespace NYql::NDq { template<typename TDerived> -class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync> { +class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>, public IDqInputChannelCallbacks { using TBase = TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>; public: using TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>::TDqComputeActorBase; @@ -285,6 +286,7 @@ protected: for (auto& [channelId, channel] : this->InputChannelsMap) { channel.Channel = TaskRunner->GetInputChannel(channelId); + channel.Channel->SetCallback(this); } for (auto& [inputIndex, source] : this->SourcesMap) { @@ -393,6 +395,7 @@ protected: this->ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished; this->ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks; } + void DrainAsyncOutput(ui64 outputIndex, typename TBase::TAsyncOutputInfoBase& outputInfo) override final { this->ProcessOutputsState.AllOutputsFinished &= outputInfo.Finished; if (outputInfo.Finished && !this->Checkpoints) { @@ -431,6 +434,14 @@ protected: this->ProcessOutputsState.DataWasSent |= outputInfo.Finished || sent; } + void TakeCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) override { + CA_LOG_T("Take checkpoint from channelId: " << channelId << ", checkpoint: " << checkpoint.ShortDebugString()); + auto* inputChannel = this->InputChannelsMap.FindPtr(channelId); + YQL_ENSURE(inputChannel, "task: " << this->Task.GetId() << ", unknown input channelId: " << channelId); + inputChannel->Pause(checkpoint); + this->Checkpoints->RegisterCheckpoint(checkpoint, channelId); + } + protected: TIntrusivePtr<IDqTaskRunner> TaskRunner; diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto index 1a46cb0ff2b..be26c0ababe 100644 --- a/ydb/library/yql/dq/actors/protos/dq_events.proto +++ b/ydb/library/yql/dq/actors/protos/dq_events.proto @@ -269,6 +269,9 @@ message TEvChannelDataV2 { optional bool Leading = 13; optional bool Finished = 14; optional uint64 ConfirmedPopBytes = 15; + + optional TCheckpoint Checkpoint = 16; + optional TWatermark Watermark = 17; }; message TEvChannelAckV2 { diff --git a/ydb/library/yql/dq/runtime/dq_channel_service.cpp b/ydb/library/yql/dq/runtime/dq_channel_service.cpp index 5a3b3c2af79..41af67b2b97 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_service.cpp +++ b/ydb/library/yql/dq/runtime/dq_channel_service.cpp @@ -3,6 +3,7 @@ #include "dq_arrow_helpers.h" #include "dq_channel_service_impl.h" +#include "dq_output.h" #include <ydb/library/actors/core/log.h> #include <ydb/library/yql/dq/actors/dq.h> @@ -50,6 +51,19 @@ void BufferToData(TDataChunk& data, TBuffer&& buffer) { data.Finished = ReadNumber<bool>(source); data.Timestamp = TInstant::MicroSeconds(ReadNumber<ui64>(source)); + bool hasCheckpoint = ReadNumber<bool>(source); + if (hasCheckpoint) { + data.Checkpoint = NDqProto::TCheckpoint{}; + data.Checkpoint->SetId(ReadNumber<ui64>(source)); + data.Checkpoint->SetGeneration(ReadNumber<ui64>(source)); + data.Checkpoint->SetType(static_cast<NYql::NDqProto::ECheckpointType>(ReadNumber<ui8>(source))); + } + bool hasWatermark = ReadNumber<bool>(source); + if (hasWatermark) { + data.Watermark = NDqProto::TWatermark{}; + data.Watermark->SetTimestampUs(ReadNumber<ui64>(source)); + } + ui64 size = ReadNumber<ui64>(source); YQL_ENSURE(size == source.size(), "Spilled data is corrupted"); data.Buffer = TChunkedBuffer(source, sharedBuffer); @@ -65,6 +79,16 @@ TChunkedBuffer DataToBuffer(TDataChunk&& data) { AppendNumber<bool>(result, data.Leading); AppendNumber<bool>(result, data.Finished); AppendNumber<ui64>(result, data.Timestamp.MicroSeconds()); + AppendNumber<bool>(result, !data.Checkpoint.Empty()); + if (data.Checkpoint) { + AppendNumber<ui64>(result, data.Checkpoint->GetId()); + AppendNumber<ui64>(result, data.Checkpoint->GetGeneration()); + AppendNumber<ui8>(result, data.Checkpoint->GetType()); + } + AppendNumber<bool>(result, !data.Watermark.Empty()); + if (data.Watermark) { + AppendNumber<ui64>(result, data.Watermark->GetTimestampUs()); + } AppendNumber<ui64>(result, data.Buffer.Size()); result.Append(std::move(data.Buffer)); @@ -1023,6 +1047,14 @@ void TNodeState::SendMessage(std::shared_ptr<TOutputItem> item) { } ev->Record.SetConfirmedPopBytes(item->Descriptor->RemotePopBytes.load()); + if (item->Data.Checkpoint) { + *ev->Record.MutableCheckpoint() = item->Data.Checkpoint.GetRef(); + } + + if (item->Data.Watermark) { + *ev->Record.MutableWatermark() = item->Data.Watermark.GetRef(); + } + ui32 flags = NActors::IEventHandle::FlagTrackDelivery; if (!Subscribed.exchange(true)) { flags |= NActors::IEventHandle::FlagSubscribeOnSession; @@ -1135,7 +1167,14 @@ void TNodeState::HandleChannelData(TEvDqCompute::TEvChannelDataV2::TPtr& ev) { // data.Timestamp = TInstant::MicroSeconds(record.GetSendTime()); } data.Bytes = record.GetBytes(); - Y_ENSURE(data.Bytes > data.Buffer.Size()); // record.GetBytes() == data.Buffer.Size() + const + Y_ENSURE(data.Bytes > data.Buffer.Size(), "data.Bytes " << data.Bytes << " data.Buffer.Size() " << data.Buffer.Size()); // record.GetBytes() == data.Buffer.Size() + const + + if (record.HasWatermark()) { + data.Watermark = record.GetWatermark(); + } + if (record.HasCheckpoint()) { + data.Checkpoint = record.GetCheckpoint(); + } if (descriptor->PushDataChunk(std::move(data))) { UpdateProgress(descriptor); } @@ -2093,12 +2132,31 @@ TString TDqChannelService::GetDebugInfo() { // TFastDqOutputChannel:: bool TFastDqInputChannel::Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>& watermark) { - Y_UNUSED(watermark); + + if (PausedByCheckpoint) { + return false; + } TDataChunk chunk; - auto popResult = Buffer->Pop(chunk) && !chunk.Buffer.Empty(); + bool popResult = Buffer->Pop(chunk); + PushStats.PopTime = TInstant::Now(); + PushStats.PopResult = popResult; + + if (popResult && chunk.Checkpoint) { + if (Callback) { + Callback->TakeCheckpoint(*chunk.Checkpoint, GetChannelId()); + } + Y_ENSURE(batch.RowCount() == 0); + return false; + } - if (popResult) { + if (popResult && chunk.Watermark) { + watermark = TInstant::MicroSeconds(chunk.Watermark->GetTimestampUs()); + return true; + } + + bool hasData = popResult && !chunk.Buffer.Empty(); + if (hasData) { if (chunk.TransportVersion != Deserializer->TransportVersion || chunk.PackerVersion != Deserializer->PackerVersion) { auto deserializer = CreateDeserializer(Deserializer->RowType, chunk.TransportVersion, chunk.PackerVersion, Nothing(), Deserializer->HolderFactory); Deserializer = std::move(deserializer); @@ -2107,10 +2165,7 @@ bool TFastDqInputChannel::Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMay Y_ENSURE(batch.RowCount() > 0); } - PushStats.PopTime = TInstant::Now(); - PushStats.PopResult = popResult; - - return popResult; + return hasData; } void TFastDqOutputChannel::Bind(NActors::TActorId outputActorId, NActors::TActorId inputActorId) { diff --git a/ydb/library/yql/dq/runtime/dq_channel_service.h b/ydb/library/yql/dq/runtime/dq_channel_service.h index 9b6ad96978a..f50dfd76d69 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_service.h +++ b/ydb/library/yql/dq/runtime/dq_channel_service.h @@ -6,7 +6,9 @@ #include <ydb/library/actors/core/actorid.h> #include <ydb/library/actors/core/actorsystem.h> +#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h> + #include <ydb/library/yql/dq/proto/dq_transport.pb.h> #include <yql/essentials/minikql/computation/mkql_computation_node_pack.h> @@ -64,6 +66,24 @@ public: Timestamp = TInstant::Now(); } + TDataChunk( + NDqProto::TCheckpoint&& checkpoint, + bool leading) + : Bytes(1) + , Leading(leading) + , Timestamp(TInstant::Now()) + , Checkpoint(std::move(checkpoint)) + {} + + TDataChunk( + NDqProto::TWatermark&& watermark, + bool leading) + : Bytes (1) + , Leading(leading) + , Timestamp(TInstant::Now()) + , Watermark(std::move(watermark)) + {} + TChunkedBuffer Buffer; ui64 Rows = 0; @@ -73,6 +93,8 @@ public: bool Leading = false; bool Finished = false; TInstant Timestamp; + TMaybe<NDqProto::TCheckpoint> Checkpoint; + TMaybe<NDqProto::TWatermark> Watermark; }; class IChannelBuffer { diff --git a/ydb/library/yql/dq/runtime/dq_channel_service_impl.h b/ydb/library/yql/dq/runtime/dq_channel_service_impl.h index 818cfa41c92..48bd0115e73 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_service_impl.h +++ b/ydb/library/yql/dq/runtime/dq_channel_service_impl.h @@ -90,6 +90,8 @@ public: virtual ~TOutputSerializer() {}; virtual void Push(NUdf::TUnboxedValue&& value) = 0; + virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0; + virtual void Push(NDqProto::TWatermark&& watermark) = 0; virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) = 0; virtual void Flush(bool finished) = 0; @@ -787,12 +789,16 @@ public: } } - void Push(NDqProto::TWatermark&&) override { - Y_ENSURE(false); + void Push(NDqProto::TWatermark&& watermark) override { + if (!Serializer->Buffer->IsFinished()) { + Serializer->Push(std::move(watermark)); + } } - void Push(NDqProto::TCheckpoint&&) override { - Y_ENSURE(false); + void Push(NDqProto::TCheckpoint&& checkpoint) override { + if (!Serializer->Buffer->IsFinished()) { + Serializer->Push(std::move(checkpoint)); + } } void Finish() override { @@ -903,13 +909,16 @@ public: } bool Empty() const override { + if (PausedByCheckpoint) { + return true; + } return Buffer->IsEmpty(); } bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>& watermark) override; bool IsFinished() const override { - return Buffer->IsFinished(); + return Buffer->IsFinished() && !PausedByCheckpoint; } NKikimr::NMiniKQL::TType* GetInputType() const override { @@ -927,15 +936,17 @@ public: } void PauseByCheckpoint() override { - Y_ENSURE(false); + Y_ENSURE(!PausedByCheckpoint); + PausedByCheckpoint = true; } void ResumeByCheckpoint() override { - Y_ENSURE(false); + Y_ENSURE(PausedByCheckpoint); + PausedByCheckpoint = false; } bool IsPausedByCheckpoint() const override { - Y_ENSURE(false); + return PausedByCheckpoint; } void AddWatermark(TInstant) override { @@ -983,11 +994,17 @@ public: return IsLocalChannel; } + void SetCallback(IDqInputChannelCallbacks* callback) override { + Callback = callback; + } + std::weak_ptr<TDqChannelService> Service; std::shared_ptr<IChannelBuffer> Buffer; std::unique_ptr<TInputDeserializer> Deserializer; bool IsLocalChannel = false; IMemoryQuotaManager::TPtr ChannelQuotaManager; + IDqInputChannelCallbacks* Callback = nullptr; + bool PausedByCheckpoint = false; }; class TChannelServiceActor : public NActors::TActorBootstrapped<TChannelServiceActor> { diff --git a/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp b/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp index 51953083918..c9e3a100bae 100644 --- a/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp +++ b/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp @@ -8,6 +8,8 @@ #include <ydb/library/yql/dq/common/rope_over_buffer.h> #include <ydb/library/yql/dq/runtime/dq_packer_version_helper.h> +#include <ydb/library/actors/core/log.h> + namespace NYql::NDq { template<bool fast> @@ -59,6 +61,16 @@ public: Rows = 0; } + void Push(NDqProto::TCheckpoint&& checkpoint) override { + Flush(false); + Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading())); + } + + void Push(NDqProto::TWatermark&& watermark) override { + Flush(false); + Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading())); + } + void Push(NUdf::TUnboxedValue&& value) override { Packer.AddItem(value); Rows++; @@ -102,6 +114,16 @@ public: YQL_ENSURE(false, "Push to Wide Channel"); } + void Push(NDqProto::TCheckpoint&& checkpoint) override { + Flush(false); + Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading())); + } + + void Push(NDqProto::TWatermark&& watermark) override { + Flush(false); + Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading())); + } + void WidePush(NUdf::TUnboxedValue* values, ui32 width) override { Packer.AddWideItem(values, width); Rows++; @@ -140,6 +162,14 @@ public: YQL_ENSURE(false, "Push to Wide Channel"); } + void Push(NDqProto::TCheckpoint&& checkpoint) override { + Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading())); + } + + void Push(NDqProto::TWatermark&& watermark) override { + Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading())); + } + void WidePush(NUdf::TUnboxedValue* values, ui32 width) override { ui32 rows = NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value; Packer.AddWideItem(values, width); diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.h b/ydb/library/yql/dq/runtime/dq_input_channel.h index 2bb4a270ee7..9687e9b4157 100644 --- a/ydb/library/yql/dq/runtime/dq_input_channel.h +++ b/ydb/library/yql/dq/runtime/dq_input_channel.h @@ -3,6 +3,8 @@ #include "dq_channel_settings.h" #include "dq_transport.h" +#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h> + namespace NYql::NDq { struct TDqInputChannelStats : TDqInputStats { @@ -15,6 +17,12 @@ struct TDqInputChannelStats : TDqInputStats { bool PopResult = false; }; + struct IDqInputChannelCallbacks { + virtual void TakeCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) = 0; + virtual ~IDqInputChannelCallbacks() = default; +}; + + class IDqInputChannel : public IDqInput { public: using TPtr = TIntrusivePtr<IDqInputChannel>; @@ -29,6 +37,10 @@ public: virtual void Bind(NActors::TActorId outputActorId, NActors::TActorId inputActorId) = 0; virtual bool IsLocal() const = 0; + + virtual void SetCallback(IDqInputChannelCallbacks* callback) { + Y_UNUSED(callback); + }; }; IDqInputChannel::TPtr CreateDqInputChannel(const TDqChannelSettings& settings, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index 9777a62aecd..701ea2ac664 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -1161,7 +1161,7 @@ private: AllocatedHolder->CheckForNotConsumedLinear(); } - LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers"); + LOG(TStringBuilder() << "task " << TaskId << ", execution finished, finish consumers"); AllocatedHolder->Output->Finish(); return ERunStatus::Finished; } diff --git a/ydb/tests/fq/streaming/conftest.py b/ydb/tests/fq/streaming/conftest.py index ced6bcea9da..ddf2de082b5 100644 --- a/ydb/tests/fq/streaming/conftest.py +++ b/ydb/tests/fq/streaming/conftest.py @@ -37,7 +37,7 @@ def kikimr(request): "enable_match_recognize": True }, table_service_config={ - "dq_channel_version": 1, + "dq_channel_version": 2, "enable_watermarks": enable_watermarks, "enable_streaming_partition_balancing": enable_streaming_partition_balancing, }, |
