aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-05-24 18:07:06 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-05-24 18:07:06 +0300
commit9cbe17130d53b1559b93d3922dc86b4be72562dd (patch)
tree7797441b75f6f1b7f62abc3fd24d3954c5e1c778
parent4a90c6ce2d1fc1a4ae269a7da400e60790e4dc61 (diff)
downloadydb-9cbe17130d53b1559b93d3922dc86b4be72562dd.tar.gz
[kqp] scan actor code cleanup: do not implement logic in switch-case to improve code readability
ref:f721bd69af3b707d59e80f9a42273c070c2cfbec
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp145
1 files changed, 79 insertions, 66 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 76aa3d711c3..f83a17f54e3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -156,10 +156,10 @@ public:
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute);
hFunc(TEvPrivate::TEvRetryShard, HandleExecute);
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleExecute);
- hFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult, HandleExecute);
hFunc(TEvents::TEvUndelivered, HandleExecute);
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleExecute);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
default:
StateFuncBase(ev, ctx);
}
@@ -307,30 +307,8 @@ private:
}
PendingScanData.emplace_back(std::make_pair(ev, startTime));
- if (auto rlPath = GetRlPath()) {
- auto selfId = this->SelfId();
- auto as = TActivationContext::ActorSystem();
-
- auto onSendAllowed = [selfId, as]() mutable {
- as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlSendAllowedTag));
- };
-
- auto onSendTimeout = [selfId, as]() {
- as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag));
- };
-
- const NRpcService::TRlFullPath rlFullPath {
- .CoordinationNode = rlPath->GetCoordinationNode(),
- .ResourcePath = rlPath->GetResourcePath(),
- .DatabaseName = rlPath->GetDatabase(),
- .Token = rlPath->GetToken()
- };
-
- auto rlActor = NRpcService::RateLimiterAcquireUseSameMailbox(
- rlFullPath, 0, RL_MAX_BATCH_DELAY,
- std::move(onSendAllowed), std::move(onSendTimeout), TActivationContext::AsActorContext());
-
- CA_LOG_D("Launch rate limiter actor: " << rlActor);
+ if (IsQuotingEnabled()) {
+ AcquireRateQuota();
} else {
ProcessScanData();
}
@@ -564,44 +542,7 @@ private:
switch (state.State) {
case EShardState::Starting:
case EShardState::Running: {
- Counters->ScanQueryShardDisconnect->Inc();
-
- if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
- CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", retries limit exceeded (" << state.TotalRetries << ")");
- return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
- << "Retries limit with shard " << state.TabletId << " exceeded.");
- }
-
- if (state.RetryAttempt < MAX_SHARD_RETRIES) {
- state.RetryAttempt++;
- state.TotalRetries++;
- state.Generation = ++LastGeneration;
- state.ActorId = {};
- state.State = EShardState::Starting;
- state.SubscribedOnTablet = false;
-
- auto retryDelay = state.CalcRetryDelay();
- if (retryDelay) {
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", restarting scan from last received key " << PrintLastKey()
- << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"
- << " schedule after " << retryDelay);
-
- state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard));
- } else {
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", restarting scan from last received key " << PrintLastKey()
- << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")");
-
- SendStartScanRequest(state, state.Generation);
- }
-
- return;
- }
-
- ResolveShard(state);
+ RetryDeliveryProblem(state);
return;
}
@@ -740,9 +681,6 @@ private:
StartTableScan();
}
- void HandleExecute(TEvTxProxySchemeCache::TEvInvalidateTableResult::TPtr&) {
- }
-
void HandleExecute(TEvents::TEvUndelivered::TPtr& ev) {
switch (ev->Get()->SourceType) {
case TEvDataShard::TEvKqpScan::EventType:
@@ -862,6 +800,81 @@ private:
IEventHandle::FlagTrackDelivery);
}
+ void RetryDeliveryProblem(TShardState& state) {
+ Counters->ScanQueryShardDisconnect->Inc();
+
+ if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
+ CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
+ << ", retries limit exceeded (" << state.TotalRetries << ")");
+ return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
+ << "Retries limit with shard " << state.TabletId << " exceeded.");
+ }
+
+ // note: it might be possible that shard is already removed after successful split/merge operation and cannot be found
+ // in this case the next TEvKqpScan request will receive the delivery problem response.
+ // so after several consecutive delivery problem responses retry logic should
+ // resolve shard details again.
+ if (state.RetryAttempt >= MAX_SHARD_RETRIES) {
+ return ResolveShard(state);
+ }
+
+ state.RetryAttempt++;
+ state.TotalRetries++;
+ state.Generation = ++LastGeneration;
+ state.ActorId = {};
+ state.State = EShardState::Starting;
+ state.SubscribedOnTablet = false;
+
+ auto retryDelay = state.CalcRetryDelay();
+ if (retryDelay) {
+ CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
+ << ", restarting scan from last received key " << PrintLastKey()
+ << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"
+ << " schedule after " << retryDelay);
+
+ state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard));
+ } else {
+ CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
+ << ", restarting scan from last received key " << PrintLastKey()
+ << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")");
+
+ SendStartScanRequest(state, state.Generation);
+ }
+ }
+
+ bool IsQuotingEnabled() const {
+ const auto& rlPath = GetRlPath();
+ return rlPath.Defined();
+ }
+
+ void AcquireRateQuota() {
+ const auto& rlPath = GetRlPath();
+ auto selfId = this->SelfId();
+ auto as = TActivationContext::ActorSystem();
+
+ auto onSendAllowed = [selfId, as]() mutable {
+ as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlSendAllowedTag));
+ };
+
+ auto onSendTimeout = [selfId, as]() {
+ as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag));
+ };
+
+ const NRpcService::TRlFullPath rlFullPath {
+ .CoordinationNode = rlPath->GetCoordinationNode(),
+ .ResourcePath = rlPath->GetResourcePath(),
+ .DatabaseName = rlPath->GetDatabase(),
+ .Token = rlPath->GetToken()
+ };
+
+ auto rlActor = NRpcService::RateLimiterAcquireUseSameMailbox(
+ rlFullPath, 0, RL_MAX_BATCH_DELAY,
+ std::move(onSendAllowed), std::move(onSendTimeout), TActivationContext::AsActorContext());
+
+ CA_LOG_D("Launch rate limiter actor: " << rlActor);
+ }
+
const TSmallVec<TSerializedTableRange> GetScanRanges(const TShardState& state) const {
// No any data read previously, return all ranges
if (!LastKey.DataSize()) {