diff options
author | Hor911 <hor911@ydb.tech> | 2025-03-31 12:49:37 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-31 12:49:37 +0300 |
commit | 969a4e38a041e377fcace28a1953efbd54567a0c (patch) | |
tree | 4a6b8b653d8abba1e41b24310cca6e2f01d04c4b | |
parent | 29f5fc52a3662bda6e0fb1e26adee92eb6f8e9f5 (diff) | |
download | ydb-969a4e38a041e377fcace28a1953efbd54567a0c.tar.gz |
Legacy compatible scan pings (#16439)
-rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_events.h | 8 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 16 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/actor/actor.cpp | 15 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/reader/actor/actor.h | 3 |
7 files changed, 33 insertions, 18 deletions
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index a544080064..b9ad1addb1 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -94,6 +94,7 @@ struct TKqpComputeEvents { EvScanInitActor, EvRemoteScanData, EvRemoteScanDataAck, + EvScanPing, }; static_assert(Unused0 == EventSpaceBegin(TKikimrEvents::ES_KQP) + 200); diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index 4b9d3f2924..cdb0496fe4 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -250,16 +250,22 @@ struct TEvKqpCompute { } }; + struct TEvScanPing : public NActors::TEventPB<TEvScanPing, NKikimrKqp::TEvScanPing, + TKqpComputeEvents::EvScanPing> + { + }; + struct TEvScanInitActor : public NActors::TEventPB<TEvScanInitActor, NKikimrKqp::TEvScanInitActor, TKqpComputeEvents::EvScanInitActor> { TEvScanInitActor() = default; - TEvScanInitActor(ui64 scanId, const NActors::TActorId& scanActor, ui32 generation, const ui64 tabletId) { + TEvScanInitActor(ui64 scanId, const NActors::TActorId& scanActor, ui32 generation, const ui64 tabletId, bool allowPings = false) { Record.SetScanId(scanId); ActorIdToProto(scanActor, Record.MutableScanActorId()); Record.SetGeneration(generation); Record.SetTabletId(tabletId); + Record.SetAllowPings(allowPings); } }; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index db4b753e68..bcc0e28fe4 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -32,6 +32,7 @@ private: const ui64 FreeSpace = (ui64)8 << 20; bool NeedAck = true; bool Finished = false; + bool AllowPings = false; void DoAck() { if (Finished) { @@ -105,16 +106,17 @@ public: return !ActorId.has_value(); } - void Ping() { - if (ActorId) { - NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanDataAck(0, 0, 0), IEventHandle::FlagTrackDelivery, TabletId); + void PingIfNeeded() { + if (AllowPings && !!ActorId) { + NActors::TActivationContext::AsActorContext().Send(*ActorId, new TEvKqpCompute::TEvScanPing()); } } - void Start(const TActorId& actorId) { + void Start(const TActorId& actorId, bool allowPings) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("actor_id", actorId); AFL_ENSURE(!ActorId); ActorId = actorId; + AllowPings = allowPings; DoAck(); } @@ -292,7 +294,7 @@ public: void PingAllScanners() { for (auto&& itTablet : ShardScanners) { - itTablet.second->Ping(); + itTablet.second->PingIfNeeded(); } } @@ -319,7 +321,7 @@ public: } } - void RegisterScannerActor(const ui64 tabletId, const ui64 generation, const TActorId& scanActorId) { + void RegisterScannerActor(const ui64 tabletId, const ui64 generation, const TActorId& scanActorId, bool allowPings) { auto state = GetShardState(tabletId); if (!state || generation != state->Generation) { AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "register_scanner_actor_dropped") @@ -338,7 +340,7 @@ public: state->ResetRetry(); AFL_ENSURE(ShardsByActorId.emplace(scanActorId, state).second); - GetShardScannerVerified(tabletId)->Start(scanActorId); + GetShardScannerVerified(tabletId)->Start(scanActorId, allowPings); } void StartScanner(TShardState& state) { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index d3a2ccc5d3..cd46ad65de 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -111,7 +111,7 @@ void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& } auto& msg = ev->Get()->Record; auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); - InFlightShards.RegisterScannerActor(msg.GetTabletId(), msg.GetGeneration(), scanActorId); + InFlightShards.RegisterScannerActor(msg.GetTabletId(), msg.GetGeneration(), scanActorId, msg.GetAllowPings()); } void TKqpScanFetcherActor::HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index 46dee8ed2f..6621253bfc 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -639,7 +639,8 @@ message TEvScanInitActor { optional NActorsProto.TActorId ScanActorId = 2; optional uint32 Generation = 3; optional uint64 TabletId = 4; -}; + optional bool AllowPings = 5; +} message TEvScanError { optional Ydb.StatusIds.StatusCode Status = 1; @@ -648,6 +649,9 @@ message TEvScanError { optional uint64 TabletId = 4; } +message TEvScanPing { +} + message TEvKqpScanCursor { message TColumnShardScanPlain { } diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index fb58915099..e70cfee4b8 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -65,7 +65,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) { ScheduleWakeup(TMonotonic::Now() + Timeout / 5);
// propagate self actor id // TODO: FlagSubscribeOnSession ?
- Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId),
+ Send(ScanComputeActorId, new NKqp::TEvKqpCompute::TEvScanInitActor(ScanId, ctx.SelfID, ScanGen, TabletId, true),
IEventHandle::FlagTrackDelivery);
Become(&TColumnShardScan::StateScan);
@@ -94,13 +94,6 @@ void TColumnShardScan::HandleScan(NColumnShard::TEvPrivate::TEvTaskProcessedResu void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) {
auto g = Stats->MakeGuard("ack");
- if (ev->Get()->FreeSpace == 0 && ev->Get()->MaxChunksCount == 0) {
- if (!AckReceivedInstant) {
- LastResultInstant = TMonotonic::Now();
- }
- return;
- }
-
AFL_VERIFY(!AckReceivedInstant);
AckReceivedInstant = TMonotonic::Now();
@@ -119,6 +112,12 @@ void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev) ContinueProcessing();
}
+void TColumnShardScan::HandleScan(NKqp::TEvKqpCompute::TEvScanPing::TPtr&) {
+ if (!AckReceivedInstant) {
+ LastResultInstant = TMonotonic::Now();
+ }
+}
+
void TColumnShardScan::HandleScan(NActors::TEvents::TEvPoison::TPtr& /*ev*/) noexcept {
PassAway();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.h b/ydb/core/tx/columnshard/engines/reader/actor/actor.h index c0a859ccba..09c5f4279a 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.h +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.h @@ -46,6 +46,7 @@ private: "TabletId", TabletId)("ScanId", ScanId)("TxId", TxId)("ScanGen", ScanGen)("task_identifier", ReadMetadataRange->GetScanIdentifier())); switch (ev->GetTypeRewrite()) { hFunc(NKqp::TEvKqpCompute::TEvScanDataAck, HandleScan); + hFunc(NKqp::TEvKqpCompute::TEvScanPing, HandleScan); hFunc(NKqp::TEvKqp::TEvAbortExecution, HandleScan); hFunc(NActors::TEvents::TEvPoison, HandleScan); hFunc(TEvents::TEvUndelivered, HandleScan); @@ -60,6 +61,8 @@ private: void HandleScan(NKqp::TEvKqpCompute::TEvScanDataAck::TPtr& ev); + void HandleScan(NKqp::TEvKqpCompute::TEvScanPing::TPtr& ev); + // Returns true if it was able to produce new batch bool ProduceResults() noexcept; |