diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-09 15:56:53 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-09 15:56:53 +0300 |
commit | 513d8035935699ef5f29d1f58750eadd48f5de6d (patch) | |
tree | 93932ed220edda6d5a280b1f14c1fa6069c796d7 | |
parent | 7472de7a1fc3dadb0dedbbbe713cdc4cd7a32d0c (diff) | |
download | ydb-513d8035935699ef5f29d1f58750eadd48f5de6d.tar.gz |
[kqp] one more cleanup in the scan compute actor KIKIMR-15042
ref:bb0b3f2e2d137fe1ab3e51f79e5baff81bc5ead3
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 47 |
1 files changed, 21 insertions, 26 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index b3b57f8baf..d8f8441c02 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -48,7 +48,13 @@ class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> { EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), }; - struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> {}; + struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> { + ui64 TabletId; + + TEvRetryShard(ui64 tabletId) + : TabletId(tabletId) + {} + }; }; public: @@ -317,16 +323,13 @@ private: } } - void ProcessPendingScanDataItem() { - - auto& ev = PendingScanData.front().first; + void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) { auto& state = Shards.front(); - auto& msg = *ev->Get(); TDuration latency; - if (PendingScanData.front().second != TInstant::Zero()) { - latency = TActivationContext::Now() - PendingScanData.front().second; + if (enqueuedAt != TInstant::Zero()) { + latency = TActivationContext::Now() - enqueuedAt; Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds()); } @@ -396,7 +399,6 @@ private: ScanData->ProfileStats->MessagesByPageFault++; } } - } void ProcessScanData() { @@ -404,7 +406,10 @@ private: Y_VERIFY_DEBUG(!Shards.empty()); Y_VERIFY(!PendingScanData.empty()); - auto& ev = PendingScanData.front().first; + auto ev = std::move(PendingScanData.front().first); + TInstant enqueuedAt = std::move(PendingScanData.front().second); + PendingScanData.pop_front(); + auto& msg = *ev->Get(); auto& state = Shards.front(); @@ -412,7 +417,7 @@ private: case EShardState::Running: case EShardState::PostRunning: { if (state.Generation == msg.Generation) { - ProcessPendingScanDataItem(); + ProcessPendingScanDataItem(ev, enqueuedAt); DoExecute(); } else if (state.Generation > msg.Generation) { TerminateExpiredScan(ev->Sender, "Cancel expired scan"); @@ -429,7 +434,6 @@ private: break; } } - PendingScanData.pop_front(); } void HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) { @@ -799,23 +803,14 @@ private: state.ActorId = {}; state.State = EShardState::Starting; state.SubscribedOnTablet = false; - auto retryDelay = state.CalcRetryDelay(); - if (retryDelay) { - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", restarting scan from last received key " << PrintLastKey() - << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")" - << " schedule after " << retryDelay); - - state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard)); - } else { - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", restarting scan from last received key " << PrintLastKey() - << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"); + CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId + << ", restarting scan from last received key " << PrintLastKey() + << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")" + << " schedule after " << retryDelay); - SendStartScanRequest(state, state.Generation); - } + state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state.TabletId))); } bool IsQuotingEnabled() const { |