diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-05-24 18:07:06 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-05-24 18:07:06 +0300 |
commit | 9cbe17130d53b1559b93d3922dc86b4be72562dd (patch) | |
tree | 7797441b75f6f1b7f62abc3fd24d3954c5e1c778 | |
parent | 4a90c6ce2d1fc1a4ae269a7da400e60790e4dc61 (diff) | |
download | ydb-9cbe17130d53b1559b93d3922dc86b4be72562dd.tar.gz |
[kqp] scan actor code cleanup: do not implement logic in switch-case to improve code readability
ref:f721bd69af3b707d59e80f9a42273c070c2cfbec
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 145 |
1 files changed, 79 insertions, 66 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 76aa3d711c3..f83a17f54e3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -156,10 +156,10 @@ public: hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute); hFunc(TEvPrivate::TEvRetryShard, HandleExecute); hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleExecute); - hFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult, HandleExecute); hFunc(TEvents::TEvUndelivered, HandleExecute); hFunc(TEvInterconnect::TEvNodeDisconnected, HandleExecute); IgnoreFunc(TEvInterconnect::TEvNodeConnected); + IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); default: StateFuncBase(ev, ctx); } @@ -307,30 +307,8 @@ private: } PendingScanData.emplace_back(std::make_pair(ev, startTime)); - if (auto rlPath = GetRlPath()) { - auto selfId = this->SelfId(); - auto as = TActivationContext::ActorSystem(); - - auto onSendAllowed = [selfId, as]() mutable { - as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlSendAllowedTag)); - }; - - auto onSendTimeout = [selfId, as]() { - as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag)); - }; - - const NRpcService::TRlFullPath rlFullPath { - .CoordinationNode = rlPath->GetCoordinationNode(), - .ResourcePath = rlPath->GetResourcePath(), - .DatabaseName = rlPath->GetDatabase(), - .Token = rlPath->GetToken() - }; - - auto rlActor = NRpcService::RateLimiterAcquireUseSameMailbox( - rlFullPath, 0, RL_MAX_BATCH_DELAY, - std::move(onSendAllowed), std::move(onSendTimeout), TActivationContext::AsActorContext()); - - CA_LOG_D("Launch rate limiter actor: " << rlActor); + if (IsQuotingEnabled()) { + AcquireRateQuota(); } else { ProcessScanData(); } @@ -564,44 +542,7 @@ private: switch (state.State) { case EShardState::Starting: case EShardState::Running: { - Counters->ScanQueryShardDisconnect->Inc(); - - if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { - CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", retries limit exceeded (" << state.TotalRetries << ")"); - return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() - << "Retries limit with shard " << state.TabletId << " exceeded."); - } - - if (state.RetryAttempt < MAX_SHARD_RETRIES) { - state.RetryAttempt++; - state.TotalRetries++; - state.Generation = ++LastGeneration; - state.ActorId = {}; - state.State = EShardState::Starting; - state.SubscribedOnTablet = false; - - auto retryDelay = state.CalcRetryDelay(); - if (retryDelay) { - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", restarting scan from last received key " << PrintLastKey() - << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")" - << " schedule after " << retryDelay); - - state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard)); - } else { - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", restarting scan from last received key " << PrintLastKey() - << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"); - - SendStartScanRequest(state, state.Generation); - } - - return; - } - - ResolveShard(state); + RetryDeliveryProblem(state); return; } @@ -740,9 +681,6 @@ private: StartTableScan(); } - void HandleExecute(TEvTxProxySchemeCache::TEvInvalidateTableResult::TPtr&) { - } - void HandleExecute(TEvents::TEvUndelivered::TPtr& ev) { switch (ev->Get()->SourceType) { case TEvDataShard::TEvKqpScan::EventType: @@ -862,6 +800,81 @@ private: IEventHandle::FlagTrackDelivery); } + void RetryDeliveryProblem(TShardState& state) { + Counters->ScanQueryShardDisconnect->Inc(); + + if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { + CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId + << ", retries limit exceeded (" << state.TotalRetries << ")"); + return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() + << "Retries limit with shard " << state.TabletId << " exceeded."); + } + + // note: it might be possible that shard is already removed after successful split/merge operation and cannot be found + // in this case the next TEvKqpScan request will receive the delivery problem response. + // so after several consecutive delivery problem responses retry logic should + // resolve shard details again. + if (state.RetryAttempt >= MAX_SHARD_RETRIES) { + return ResolveShard(state); + } + + state.RetryAttempt++; + state.TotalRetries++; + state.Generation = ++LastGeneration; + state.ActorId = {}; + state.State = EShardState::Starting; + state.SubscribedOnTablet = false; + + auto retryDelay = state.CalcRetryDelay(); + if (retryDelay) { + CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId + << ", restarting scan from last received key " << PrintLastKey() + << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")" + << " schedule after " << retryDelay); + + state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard)); + } else { + CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId + << ", restarting scan from last received key " << PrintLastKey() + << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"); + + SendStartScanRequest(state, state.Generation); + } + } + + bool IsQuotingEnabled() const { + const auto& rlPath = GetRlPath(); + return rlPath.Defined(); + } + + void AcquireRateQuota() { + const auto& rlPath = GetRlPath(); + auto selfId = this->SelfId(); + auto as = TActivationContext::ActorSystem(); + + auto onSendAllowed = [selfId, as]() mutable { + as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlSendAllowedTag)); + }; + + auto onSendTimeout = [selfId, as]() { + as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag)); + }; + + const NRpcService::TRlFullPath rlFullPath { + .CoordinationNode = rlPath->GetCoordinationNode(), + .ResourcePath = rlPath->GetResourcePath(), + .DatabaseName = rlPath->GetDatabase(), + .Token = rlPath->GetToken() + }; + + auto rlActor = NRpcService::RateLimiterAcquireUseSameMailbox( + rlFullPath, 0, RL_MAX_BATCH_DELAY, + std::move(onSendAllowed), std::move(onSendTimeout), TActivationContext::AsActorContext()); + + CA_LOG_D("Launch rate limiter actor: " << rlActor); + } + const TSmallVec<TSerializedTableRange> GetScanRanges(const TShardState& state) const { // No any data read previously, return all ranges if (!LastKey.DataSize()) { |