summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDmitry Kardymon <[email protected]>2026-05-14 18:50:50 +0300
committerGitHub <[email protected]>2026-05-14 15:50:50 +0000
commitd95de0baaf0791ef8a7764d498917b69ad503bf2 (patch)
tree67e463ebc00183c7c04b72a66ff32cb95d395588
parent139172ebe216409dcd8ec4ef10b6ba077f16b2fd (diff)
YQ-5105 checkpoints/watermarks in channels 2.0 (#35734)
Co-authored-by: Pisarenko Grigoriy <[email protected]>
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_planner.cpp7
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/common.cpp4
-rw-r--r--ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp2
-rw-r--r--ydb/core/protos/kqp_physical.proto2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h13
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_events.proto3
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_service.cpp71
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_service.h22
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_service_impl.h33
-rw-r--r--ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp30
-rw-r--r--ydb/library/yql/dq/runtime/dq_input_channel.h12
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp2
-rw-r--r--ydb/tests/fq/streaming/conftest.py2
16 files changed, 181 insertions, 32 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 520b910c4e5..4feb691cc4c 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -157,6 +157,7 @@ public:
, TasksGraph(Database, Request.Transactions, Request.TxAlloc, AggregationSettings, Counters, BufferActorId, UserToken)
, ChannelService(channelService)
, PartitionPruner(MakeHolder<TPartitionPruner>(Request.TxAlloc->HolderFactory, Request.TxAlloc->TypeEnv, std::move(partitionPrunerConfig)))
+ , EnableWatermarks(executerConfig.TableServiceConfig.GetEnableWatermarks())
{
ArrayBufferMinFillPercentage = executerConfig.TableServiceConfig.GetArrayBufferMinFillPercentage();
BufferPageAllocSize = executerConfig.TableServiceConfig.GetBufferPageAllocSize();
@@ -1291,7 +1292,7 @@ protected:
.BufferPageAllocSize = BufferPageAllocSize,
.Query = Query,
.CheckpointCoordinator = CheckpointCoordinatorId,
- .EnableWatermarks = Request.QueryPhysicalGraph && Request.QueryPhysicalGraph->GetPreparedQuery().GetPhysicalQuery().GetEnableWatermarks(),
+ .EnableWatermarks = EnableWatermarks,
});
auto err = Planner->PlanExecution();
@@ -1805,7 +1806,7 @@ protected:
TKqpTasksGraph TasksGraph;
std::shared_ptr<NYql::NDq::IDqChannelService> ChannelService;
THolder<TPartitionPruner> PartitionPruner;
-
+ bool EnableWatermarks = false;
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
};
diff --git a/ydb/core/kqp/executer_actor/kqp_planner.cpp b/ydb/core/kqp/executer_actor/kqp_planner.cpp
index 93f5629f98a..8ffc4d65ad4 100644
--- a/ydb/core/kqp/executer_actor/kqp_planner.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_planner.cpp
@@ -665,12 +665,7 @@ std::unique_ptr<IEventHandle> TKqpPlanner::PlanExecution() {
void TKqpPlanner::PrepareCheckpoints() {
const auto isStreamingQuery = UserRequestContext && UserRequestContext->IsStreamingQuery;
-
- if (!isStreamingQuery) {
- return;
- }
-
- const auto enableCheckpoints = static_cast<bool>(CheckpointCoordinatorId);
+ const auto enableCheckpoints = isStreamingQuery && static_cast<bool>(CheckpointCoordinatorId);
TasksGraph.BuildCheckpointingAndWatermarksMode(enableCheckpoints, EnableWatermarks);
if (!enableCheckpoints) {
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
index e857e21c350..9a6df86ab16 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
+++ b/ydb/core/kqp/ut/federated_query/datastreams/common.cpp
@@ -40,7 +40,7 @@ NKikimrConfig::TAppConfig& TStreamingTestFixture::SetupAppConfig() {
EnsureNotInitialized("AppConfig");
auto& result = AppConfig.emplace();
- result.MutableTableServiceConfig()->SetDqChannelVersion(1u);
+ result.MutableTableServiceConfig()->SetDqChannelVersion(2u);
return result;
}
@@ -98,7 +98,7 @@ std::shared_ptr<TKikimrRunner> TStreamingTestFixture::GetKikimrRunner() {
queryServiceConfig.SetEnableMatchRecognize(true);
auto& tableServiceConfig = *AppConfig->MutableTableServiceConfig();
- tableServiceConfig.SetDqChannelVersion(1u);
+ tableServiceConfig.SetDqChannelVersion(2u);
LogSettings
.AddLogPriority(NKikimrServices::STREAMS_STORAGE_SERVICE, NLog::PRI_DEBUG)
diff --git a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
index 786ad264012..7a717a6e6e7 100644
--- a/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/datastreams/streaming_ddl_ut.cpp
@@ -1241,7 +1241,7 @@ Y_UNIT_TEST_SUITE(KqpStreamingQueriesDdl) {
CREATE STREAMING QUERY `{query_name}` AS
DO BEGIN
PRAGMA ydb.HashJoinMode = "map";
- PRAGMA ydb.DqChannelVersion = "1";
+ PRAGMA ydb.DqChannelVersion = "2";
INSERT INTO `{pq_source}`.`{output_topic}`
SELECT
diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto
index f9addbc7422..b81a70b9dbd 100644
--- a/ydb/core/protos/kqp_physical.proto
+++ b/ydb/core/protos/kqp_physical.proto
@@ -746,6 +746,6 @@ message TKqpPhyQuery {
optional bool ForceImmediateEffectsExecution = 15;
uint32 LangVer = 16;
optional bool DisableCheckpoints = 17;
- optional bool EnableWatermarks = 18;
+ optional bool EnableWatermarks = 18 [deprecated=true];
optional EIsolationLevel DefaultTxMode = 19;
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
index 0342e8f41ff..f1e07513b97 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_checkpoints.cpp
@@ -448,6 +448,7 @@ void TDqComputeActorCheckpoints::DoCheckpoint() {
if (SaveState()) {
LOG_PCP_T("Injecting checkpoint barrier to outputs");
ComputeActor->InjectBarrierToOutputs(*PendingCheckpoint.Checkpoint);
+ ComputeActor->ResumeInputsByCheckpoint();
TryToSavePendingCheckpoint();
}
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 9f561a21245..ae40d3b4e0f 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1250,7 +1250,9 @@ protected:
if (!Checkpoints) {
Checkpoints = new TDqComputeActorCheckpoints(this->SelfId(), TxId, Task, this);
Checkpoints->Init(this->SelfId(), this->RegisterWithSameMailbox(Checkpoints));
- Channels->SetCheckpointsSupport();
+ if (Channels) {
+ Channels->SetCheckpointsSupport();
+ }
}
TAutoPtr<NActors::IEventHandle> handle = new NActors::IEventHandle(Checkpoints->SelfId(), ev->Sender, ev->Release().Release());
Checkpoints->Receive(handle);
diff --git a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
index b5d4bc63ccc..eec3c834b5d 100644
--- a/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
+++ b/ydb/library/yql/dq/actors/compute/dq_sync_compute_actor_base.h
@@ -3,11 +3,12 @@
#include "dq_compute_actor_impl.h"
#include "dq_compute_actor_async_input_helper.h"
#include <ydb/library/yql/dq/actors/spilling/spiller_factory.h>
+#include <ydb/library/yql/dq/runtime/dq_input_channel.h>
namespace NYql::NDq {
template<typename TDerived>
-class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync> {
+class TDqSyncComputeActorBase: public TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>, public IDqInputChannelCallbacks {
using TBase = TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>;
public:
using TDqComputeActorBase<TDerived, TComputeActorAsyncInputHelperSync>::TDqComputeActorBase;
@@ -285,6 +286,7 @@ protected:
for (auto& [channelId, channel] : this->InputChannelsMap) {
channel.Channel = TaskRunner->GetInputChannel(channelId);
+ channel.Channel->SetCallback(this);
}
for (auto& [inputIndex, source] : this->SourcesMap) {
@@ -393,6 +395,7 @@ protected:
this->ProcessOutputsState.AllOutputsFinished &= outputChannel.Finished;
this->ProcessOutputsState.DataWasSent |= (!wasFinished && outputChannel.Finished) || sentChunks;
}
+
void DrainAsyncOutput(ui64 outputIndex, typename TBase::TAsyncOutputInfoBase& outputInfo) override final {
this->ProcessOutputsState.AllOutputsFinished &= outputInfo.Finished;
if (outputInfo.Finished && !this->Checkpoints) {
@@ -431,6 +434,14 @@ protected:
this->ProcessOutputsState.DataWasSent |= outputInfo.Finished || sent;
}
+ void TakeCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) override {
+ CA_LOG_T("Take checkpoint from channelId: " << channelId << ", checkpoint: " << checkpoint.ShortDebugString());
+ auto* inputChannel = this->InputChannelsMap.FindPtr(channelId);
+ YQL_ENSURE(inputChannel, "task: " << this->Task.GetId() << ", unknown input channelId: " << channelId);
+ inputChannel->Pause(checkpoint);
+ this->Checkpoints->RegisterCheckpoint(checkpoint, channelId);
+ }
+
protected:
TIntrusivePtr<IDqTaskRunner> TaskRunner;
diff --git a/ydb/library/yql/dq/actors/protos/dq_events.proto b/ydb/library/yql/dq/actors/protos/dq_events.proto
index 1a46cb0ff2b..be26c0ababe 100644
--- a/ydb/library/yql/dq/actors/protos/dq_events.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_events.proto
@@ -269,6 +269,9 @@ message TEvChannelDataV2 {
optional bool Leading = 13;
optional bool Finished = 14;
optional uint64 ConfirmedPopBytes = 15;
+
+ optional TCheckpoint Checkpoint = 16;
+ optional TWatermark Watermark = 17;
};
message TEvChannelAckV2 {
diff --git a/ydb/library/yql/dq/runtime/dq_channel_service.cpp b/ydb/library/yql/dq/runtime/dq_channel_service.cpp
index 5a3b3c2af79..41af67b2b97 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_service.cpp
+++ b/ydb/library/yql/dq/runtime/dq_channel_service.cpp
@@ -3,6 +3,7 @@
#include "dq_arrow_helpers.h"
#include "dq_channel_service_impl.h"
+#include "dq_output.h"
#include <ydb/library/actors/core/log.h>
#include <ydb/library/yql/dq/actors/dq.h>
@@ -50,6 +51,19 @@ void BufferToData(TDataChunk& data, TBuffer&& buffer) {
data.Finished = ReadNumber<bool>(source);
data.Timestamp = TInstant::MicroSeconds(ReadNumber<ui64>(source));
+ bool hasCheckpoint = ReadNumber<bool>(source);
+ if (hasCheckpoint) {
+ data.Checkpoint = NDqProto::TCheckpoint{};
+ data.Checkpoint->SetId(ReadNumber<ui64>(source));
+ data.Checkpoint->SetGeneration(ReadNumber<ui64>(source));
+ data.Checkpoint->SetType(static_cast<NYql::NDqProto::ECheckpointType>(ReadNumber<ui8>(source)));
+ }
+ bool hasWatermark = ReadNumber<bool>(source);
+ if (hasWatermark) {
+ data.Watermark = NDqProto::TWatermark{};
+ data.Watermark->SetTimestampUs(ReadNumber<ui64>(source));
+ }
+
ui64 size = ReadNumber<ui64>(source);
YQL_ENSURE(size == source.size(), "Spilled data is corrupted");
data.Buffer = TChunkedBuffer(source, sharedBuffer);
@@ -65,6 +79,16 @@ TChunkedBuffer DataToBuffer(TDataChunk&& data) {
AppendNumber<bool>(result, data.Leading);
AppendNumber<bool>(result, data.Finished);
AppendNumber<ui64>(result, data.Timestamp.MicroSeconds());
+ AppendNumber<bool>(result, !data.Checkpoint.Empty());
+ if (data.Checkpoint) {
+ AppendNumber<ui64>(result, data.Checkpoint->GetId());
+ AppendNumber<ui64>(result, data.Checkpoint->GetGeneration());
+ AppendNumber<ui8>(result, data.Checkpoint->GetType());
+ }
+ AppendNumber<bool>(result, !data.Watermark.Empty());
+ if (data.Watermark) {
+ AppendNumber<ui64>(result, data.Watermark->GetTimestampUs());
+ }
AppendNumber<ui64>(result, data.Buffer.Size());
result.Append(std::move(data.Buffer));
@@ -1023,6 +1047,14 @@ void TNodeState::SendMessage(std::shared_ptr<TOutputItem> item) {
}
ev->Record.SetConfirmedPopBytes(item->Descriptor->RemotePopBytes.load());
+ if (item->Data.Checkpoint) {
+ *ev->Record.MutableCheckpoint() = item->Data.Checkpoint.GetRef();
+ }
+
+ if (item->Data.Watermark) {
+ *ev->Record.MutableWatermark() = item->Data.Watermark.GetRef();
+ }
+
ui32 flags = NActors::IEventHandle::FlagTrackDelivery;
if (!Subscribed.exchange(true)) {
flags |= NActors::IEventHandle::FlagSubscribeOnSession;
@@ -1135,7 +1167,14 @@ void TNodeState::HandleChannelData(TEvDqCompute::TEvChannelDataV2::TPtr& ev) {
// data.Timestamp = TInstant::MicroSeconds(record.GetSendTime());
}
data.Bytes = record.GetBytes();
- Y_ENSURE(data.Bytes > data.Buffer.Size()); // record.GetBytes() == data.Buffer.Size() + const
+ Y_ENSURE(data.Bytes > data.Buffer.Size(), "data.Bytes " << data.Bytes << " data.Buffer.Size() " << data.Buffer.Size()); // record.GetBytes() == data.Buffer.Size() + const
+
+ if (record.HasWatermark()) {
+ data.Watermark = record.GetWatermark();
+ }
+ if (record.HasCheckpoint()) {
+ data.Checkpoint = record.GetCheckpoint();
+ }
if (descriptor->PushDataChunk(std::move(data))) {
UpdateProgress(descriptor);
}
@@ -2093,12 +2132,31 @@ TString TDqChannelService::GetDebugInfo() {
// TFastDqOutputChannel::
bool TFastDqInputChannel::Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>& watermark) {
- Y_UNUSED(watermark);
+
+ if (PausedByCheckpoint) {
+ return false;
+ }
TDataChunk chunk;
- auto popResult = Buffer->Pop(chunk) && !chunk.Buffer.Empty();
+ bool popResult = Buffer->Pop(chunk);
+ PushStats.PopTime = TInstant::Now();
+ PushStats.PopResult = popResult;
+
+ if (popResult && chunk.Checkpoint) {
+ if (Callback) {
+ Callback->TakeCheckpoint(*chunk.Checkpoint, GetChannelId());
+ }
+ Y_ENSURE(batch.RowCount() == 0);
+ return false;
+ }
- if (popResult) {
+ if (popResult && chunk.Watermark) {
+ watermark = TInstant::MicroSeconds(chunk.Watermark->GetTimestampUs());
+ return true;
+ }
+
+ bool hasData = popResult && !chunk.Buffer.Empty();
+ if (hasData) {
if (chunk.TransportVersion != Deserializer->TransportVersion || chunk.PackerVersion != Deserializer->PackerVersion) {
auto deserializer = CreateDeserializer(Deserializer->RowType, chunk.TransportVersion, chunk.PackerVersion, Nothing(), Deserializer->HolderFactory);
Deserializer = std::move(deserializer);
@@ -2107,10 +2165,7 @@ bool TFastDqInputChannel::Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMay
Y_ENSURE(batch.RowCount() > 0);
}
- PushStats.PopTime = TInstant::Now();
- PushStats.PopResult = popResult;
-
- return popResult;
+ return hasData;
}
void TFastDqOutputChannel::Bind(NActors::TActorId outputActorId, NActors::TActorId inputActorId) {
diff --git a/ydb/library/yql/dq/runtime/dq_channel_service.h b/ydb/library/yql/dq/runtime/dq_channel_service.h
index 9b6ad96978a..f50dfd76d69 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_service.h
+++ b/ydb/library/yql/dq/runtime/dq_channel_service.h
@@ -6,7 +6,9 @@
#include <ydb/library/actors/core/actorid.h>
#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+
#include <ydb/library/yql/dq/proto/dq_transport.pb.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_pack.h>
@@ -64,6 +66,24 @@ public:
Timestamp = TInstant::Now();
}
+ TDataChunk(
+ NDqProto::TCheckpoint&& checkpoint,
+ bool leading)
+ : Bytes(1)
+ , Leading(leading)
+ , Timestamp(TInstant::Now())
+ , Checkpoint(std::move(checkpoint))
+ {}
+
+ TDataChunk(
+ NDqProto::TWatermark&& watermark,
+ bool leading)
+ : Bytes (1)
+ , Leading(leading)
+ , Timestamp(TInstant::Now())
+ , Watermark(std::move(watermark))
+ {}
+
TChunkedBuffer Buffer;
ui64 Rows = 0;
@@ -73,6 +93,8 @@ public:
bool Leading = false;
bool Finished = false;
TInstant Timestamp;
+ TMaybe<NDqProto::TCheckpoint> Checkpoint;
+ TMaybe<NDqProto::TWatermark> Watermark;
};
class IChannelBuffer {
diff --git a/ydb/library/yql/dq/runtime/dq_channel_service_impl.h b/ydb/library/yql/dq/runtime/dq_channel_service_impl.h
index 818cfa41c92..48bd0115e73 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_service_impl.h
+++ b/ydb/library/yql/dq/runtime/dq_channel_service_impl.h
@@ -90,6 +90,8 @@ public:
virtual ~TOutputSerializer() {};
virtual void Push(NUdf::TUnboxedValue&& value) = 0;
+ virtual void Push(NDqProto::TCheckpoint&& checkpoint) = 0;
+ virtual void Push(NDqProto::TWatermark&& watermark) = 0;
virtual void WidePush(NUdf::TUnboxedValue* values, ui32 width) = 0;
virtual void Flush(bool finished) = 0;
@@ -787,12 +789,16 @@ public:
}
}
- void Push(NDqProto::TWatermark&&) override {
- Y_ENSURE(false);
+ void Push(NDqProto::TWatermark&& watermark) override {
+ if (!Serializer->Buffer->IsFinished()) {
+ Serializer->Push(std::move(watermark));
+ }
}
- void Push(NDqProto::TCheckpoint&&) override {
- Y_ENSURE(false);
+ void Push(NDqProto::TCheckpoint&& checkpoint) override {
+ if (!Serializer->Buffer->IsFinished()) {
+ Serializer->Push(std::move(checkpoint));
+ }
}
void Finish() override {
@@ -903,13 +909,16 @@ public:
}
bool Empty() const override {
+ if (PausedByCheckpoint) {
+ return true;
+ }
return Buffer->IsEmpty();
}
bool Pop(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe<TInstant>& watermark) override;
bool IsFinished() const override {
- return Buffer->IsFinished();
+ return Buffer->IsFinished() && !PausedByCheckpoint;
}
NKikimr::NMiniKQL::TType* GetInputType() const override {
@@ -927,15 +936,17 @@ public:
}
void PauseByCheckpoint() override {
- Y_ENSURE(false);
+ Y_ENSURE(!PausedByCheckpoint);
+ PausedByCheckpoint = true;
}
void ResumeByCheckpoint() override {
- Y_ENSURE(false);
+ Y_ENSURE(PausedByCheckpoint);
+ PausedByCheckpoint = false;
}
bool IsPausedByCheckpoint() const override {
- Y_ENSURE(false);
+ return PausedByCheckpoint;
}
void AddWatermark(TInstant) override {
@@ -983,11 +994,17 @@ public:
return IsLocalChannel;
}
+ void SetCallback(IDqInputChannelCallbacks* callback) override {
+ Callback = callback;
+ }
+
std::weak_ptr<TDqChannelService> Service;
std::shared_ptr<IChannelBuffer> Buffer;
std::unique_ptr<TInputDeserializer> Deserializer;
bool IsLocalChannel = false;
IMemoryQuotaManager::TPtr ChannelQuotaManager;
+ IDqInputChannelCallbacks* Callback = nullptr;
+ bool PausedByCheckpoint = false;
};
class TChannelServiceActor : public NActors::TActorBootstrapped<TChannelServiceActor> {
diff --git a/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp b/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp
index 51953083918..c9e3a100bae 100644
--- a/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp
+++ b/ydb/library/yql/dq/runtime/dq_channel_service_pack.cpp
@@ -8,6 +8,8 @@
#include <ydb/library/yql/dq/common/rope_over_buffer.h>
#include <ydb/library/yql/dq/runtime/dq_packer_version_helper.h>
+#include <ydb/library/actors/core/log.h>
+
namespace NYql::NDq {
template<bool fast>
@@ -59,6 +61,16 @@ public:
Rows = 0;
}
+ void Push(NDqProto::TCheckpoint&& checkpoint) override {
+ Flush(false);
+ Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading()));
+ }
+
+ void Push(NDqProto::TWatermark&& watermark) override {
+ Flush(false);
+ Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading()));
+ }
+
void Push(NUdf::TUnboxedValue&& value) override {
Packer.AddItem(value);
Rows++;
@@ -102,6 +114,16 @@ public:
YQL_ENSURE(false, "Push to Wide Channel");
}
+ void Push(NDqProto::TCheckpoint&& checkpoint) override {
+ Flush(false);
+ Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading()));
+ }
+
+ void Push(NDqProto::TWatermark&& watermark) override {
+ Flush(false);
+ Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading()));
+ }
+
void WidePush(NUdf::TUnboxedValue* values, ui32 width) override {
Packer.AddWideItem(values, width);
Rows++;
@@ -140,6 +162,14 @@ public:
YQL_ENSURE(false, "Push to Wide Channel");
}
+ void Push(NDqProto::TCheckpoint&& checkpoint) override {
+ Buffer->Push(TDataChunk(std::move(checkpoint), Buffer->GetLeading()));
+ }
+
+ void Push(NDqProto::TWatermark&& watermark) override {
+ Buffer->Push(TDataChunk(std::move(watermark), Buffer->GetLeading()));
+ }
+
void WidePush(NUdf::TUnboxedValue* values, ui32 width) override {
ui32 rows = NKikimr::NMiniKQL::TArrowBlock::From(values[width - 1]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
Packer.AddWideItem(values, width);
diff --git a/ydb/library/yql/dq/runtime/dq_input_channel.h b/ydb/library/yql/dq/runtime/dq_input_channel.h
index 2bb4a270ee7..9687e9b4157 100644
--- a/ydb/library/yql/dq/runtime/dq_input_channel.h
+++ b/ydb/library/yql/dq/runtime/dq_input_channel.h
@@ -3,6 +3,8 @@
#include "dq_channel_settings.h"
#include "dq_transport.h"
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
+
namespace NYql::NDq {
struct TDqInputChannelStats : TDqInputStats {
@@ -15,6 +17,12 @@ struct TDqInputChannelStats : TDqInputStats {
bool PopResult = false;
};
+ struct IDqInputChannelCallbacks {
+ virtual void TakeCheckpoint(const NDqProto::TCheckpoint& checkpoint, ui64 channelId) = 0;
+ virtual ~IDqInputChannelCallbacks() = default;
+};
+
+
class IDqInputChannel : public IDqInput {
public:
using TPtr = TIntrusivePtr<IDqInputChannel>;
@@ -29,6 +37,10 @@ public:
virtual void Bind(NActors::TActorId outputActorId, NActors::TActorId inputActorId) = 0;
virtual bool IsLocal() const = 0;
+
+ virtual void SetCallback(IDqInputChannelCallbacks* callback) {
+ Y_UNUSED(callback);
+ };
};
IDqInputChannel::TPtr CreateDqInputChannel(const TDqChannelSettings& settings, const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv);
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index 9777a62aecd..701ea2ac664 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -1161,7 +1161,7 @@ private:
AllocatedHolder->CheckForNotConsumedLinear();
}
- LOG(TStringBuilder() << "task" << TaskId << ", execution finished, finish consumers");
+ LOG(TStringBuilder() << "task " << TaskId << ", execution finished, finish consumers");
AllocatedHolder->Output->Finish();
return ERunStatus::Finished;
}
diff --git a/ydb/tests/fq/streaming/conftest.py b/ydb/tests/fq/streaming/conftest.py
index ced6bcea9da..ddf2de082b5 100644
--- a/ydb/tests/fq/streaming/conftest.py
+++ b/ydb/tests/fq/streaming/conftest.py
@@ -37,7 +37,7 @@ def kikimr(request):
"enable_match_recognize": True
},
table_service_config={
- "dq_channel_version": 1,
+ "dq_channel_version": 2,
"enable_watermarks": enable_watermarks,
"enable_streaming_partition_balancing": enable_streaming_partition_balancing,
},