aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHor911 <hor911@ydb.tech>2025-03-31 12:49:37 +0300
committerGitHub <noreply@github.com>2025-03-31 12:49:37 +0300
commit969a4e38a041e377fcace28a1953efbd54567a0c (patch)
tree4a6b8b653d8abba1e41b24310cca6e2f01d04c4b
parent29f5fc52a3662bda6e0fb1e26adee92eb6f8e9f5 (diff)
downloadydb-969a4e38a041e377fcace28a1953efbd54567a0c.tar.gz
Legacy compatible scan pings (#16439)
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_events.h8
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h16
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp2
-rw-r--r--ydb/core/protos/kqp.proto6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.cpp15
-rw-r--r--ydb/core/tx/columnshard/engines/reader/actor/actor.h3
7 files changed, 33 insertions, 18 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index a544080064..b9ad1addb1 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -94,6 +94,7 @@ struct TKqpComputeEvents {
EvScanInitActor,
EvRemoteScanData,
EvRemoteScanDataAck,
+ EvScanPing,
};
static_assert(Unused0 == EventSpaceBegin(TKikimrEvents::ES_KQP) + 200);
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h
index 4b9d3f2924..cdb0496fe4 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_events.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h
@@ -250,16 +250,22 @@ struct TEvKqpCompute {
}
};
+ struct TEvScanPing : public NActors::TEventPB<TEvScanPing, NKikimrKqp::TEvScanPing,
+ TKqpComputeEvents::EvScanPing>
+ {
+ };
+
struct TEvScanInitActor : public NActors::TEventPB<TEvScanInitActor, NKikimrKqp::TEvScanInitActor,
TKqpComputeEvents::EvScanInitActor>
{
TEvScanInitActor() = default;
- TEvScanInitActor(ui64 scanId, const NActors::TActorId& scanActor, ui32 generation, const ui64 tabletId) {
+ TEvScanInitActor(ui64 scanId, const NActors::TActorId& scanActor, ui32 generation, const ui64 tabletId, bool allowPings = false) {
Record.SetScanId(scanId);
ActorIdToProto(scanActor, Record.MutableScanActorId());
Record.SetGeneration(generation);
Record.SetTabletId(tabletId);
+ Record.SetAllowPings(allowPings);
}
};
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
index db4b753e68..bcc0e28fe4 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -32,6 +32,7 @@ private:
const ui64 FreeSpace = (ui64)8 << 20;
bool NeedAck = true;
bool Finished = false;
+ bool AllowPings = false;
void DoAck() {
if (Finished) {
@@ -105,16 +106,17 @@ public:
return !ActorId.has_value();
}
- void Ping() {
- if (ActorId) {
- NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanDataAck(0, 0, 0), IEventHandle::FlagTrackDelivery, TabletId);
+ void PingIfNeeded() {
+ if (AllowPings && !!ActorId) {
+ NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanPing());
}
}
- void Start(const TActorId& actorId) {
+ void Start(const TActorId& actorId, bool allowPings) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("actor_id", actorId);
AFL_ENSURE(!ActorId);
ActorId = actorId;
+ AllowPings = allowPings;
DoAck();
}
@@ -292,7 +294,7 @@ public:
void PingAllScanners() {
for (auto&& itTablet : ShardScanners) {
- itTablet.second->Ping();
+ itTablet.second->PingIfNeeded();
}
}
@@ -319,7 +321,7 @@ public:
}
}
- void RegisterScannerActor(const ui64 tabletId, const ui64 generation, const TActorId& scanActorId) {
+ void RegisterScannerActor(const ui64 tabletId, const ui64 generation, const TActorId& scanActorId, bool allowPings) {
auto state = GetShardState(tabletId);
if (!state || generation != state->Generation) {
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "register_scanner_actor_dropped")
@@ -338,7 +340,7 @@ public:
state->ResetRetry();
AFL_ENSURE(ShardsByActorId.emplace(scanActorId, state).second);
- GetShardScannerVerified(tabletId)->Start(scanActorId);
+ GetShardScannerVerified(tabletId)->Start(scanActorId, allowPings);
}
void StartScanner(TShardState& state) {
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
index d3a2ccc5d3..cd46ad65de 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
@@ -111,7 +111,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr&
}
auto& msg = ev->Get()->Record;
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
- InFlightShards.RegisterScannerActor(msg.GetTabletId(), msg.GetGeneration(), scanActorId);
+ InFlightShards.RegisterScannerActor(msg.GetTabletId(), msg.GetGeneration(), scanActorId, msg.GetAllowPings());
}
void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index 46dee8ed2f..6621253bfc 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -639,7 +639,8 @@ message TEvScanInitActor {
optional NActorsProto.TActorId ScanActorId = 2;
optional uint32 Generation = 3;
optional uint64 TabletId = 4;
-};
+ optional bool AllowPings = 5;
+}
message TEvScanError {
optional Ydb.StatusIds.StatusCode Status = 1;
@@ -648,6 +649,9 @@ message TEvScanError {
optional uint64 TabletId = 4;
}
+message TEvScanPing {
+}
+
message TEvKqpScanCursor {
message TColumnShardScanPlain {
}
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
index fb58915099..e70cfee4b8 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
@@ -65,7 +65,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ScheduleWakeup(TMonotonic::Now() + Timeout / 5);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
- Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId),
+ Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId, true),
IEventHandle::FlagTrackDelivery);
Become(&TColumnShardScan::StateScan);
@@ -94,13 +94,6 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu
void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
auto g = Stats->MakeGuard("ack");
- if (ev->Get()->FreeSpace == 0 && ev->Get()->MaxChunksCount == 0) {
- if (!AckReceivedInstant) {
- LastResultInstant = TMonotonic::Now();
- }
- return;
- }
-
AFL_VERIFY(!AckReceivedInstant);
AckReceivedInstant = TMonotonic::Now();
@@ -119,6 +112,12 @@ void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev)
ContinueProcessing();
}
+void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanPing::TPtr&) {
+ if (!AckReceivedInstant) {
+ LastResultInstant = TMonotonic::Now();
+ }
+}
+
void TColumnShardScan::HandleScan(NActors::TEvents::TEvPoison::TPtr& /*ev*/) noexcept {
PassAway();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
index c0a859ccba..09c5f4279a 100644
--- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h
+++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h
@@ -46,6 +46,7 @@ private:
"TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen)("task_identifier", ReadMetadataRange->GetScanIdentifier()));
switch (ev->GetTypeRewrite()) {
hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScan);
+ hFunc(NKqp::TEvKqpCompute::TEvScanPing, HandleScan);
hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleScan);
hFunc(NActors::TEvents::TEvPoison, HandleScan);
hFunc(TEvents::TEvUndelivered, HandleScan);
@@ -60,6 +61,8 @@ private:
void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev);
+ void HandleScan(NKqp::TEvKqpCompute::TEvScanPing::TPtr& ev);
+
// Returns true if it was able to produce new batch
bool ProduceResults() noexcept;