aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-09 15:56:53 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-09 15:56:53 +0300
commit513d8035935699ef5f29d1f58750eadd48f5de6d (patch)
tree93932ed220edda6d5a280b1f14c1fa6069c796d7
parent7472de7a1fc3dadb0dedbbbe713cdc4cd7a32d0c (diff)
downloadydb-513d8035935699ef5f29d1f58750eadd48f5de6d.tar.gz
[kqp] one more cleanup in the scan compute actor KIKIMR-15042
ref:bb0b3f2e2d137fe1ab3e51f79e5baff81bc5ead3
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp47
1 files changed, 21 insertions, 26 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index b3b57f8baf..d8f8441c02 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -48,7 +48,13 @@ class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> {
EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
};
- struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> {};
+ struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> {
+ ui64 TabletId;
+
+ TEvRetryShard(ui64 tabletId)
+ : TabletId(tabletId)
+ {}
+ };
};
public:
@@ -317,16 +323,13 @@ private:
}
}
- void ProcessPendingScanDataItem() {
-
- auto& ev = PendingScanData.front().first;
+ void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) {
auto& state = Shards.front();
-
auto& msg = *ev->Get();
TDuration latency;
- if (PendingScanData.front().second != TInstant::Zero()) {
- latency = TActivationContext::Now() - PendingScanData.front().second;
+ if (enqueuedAt != TInstant::Zero()) {
+ latency = TActivationContext::Now() - enqueuedAt;
Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds());
}
@@ -396,7 +399,6 @@ private:
ScanData->ProfileStats->MessagesByPageFault++;
}
}
-
}
void ProcessScanData() {
@@ -404,7 +406,10 @@ private:
Y_VERIFY_DEBUG(!Shards.empty());
Y_VERIFY(!PendingScanData.empty());
- auto& ev = PendingScanData.front().first;
+ auto ev = std::move(PendingScanData.front().first);
+ TInstant enqueuedAt = std::move(PendingScanData.front().second);
+ PendingScanData.pop_front();
+
auto& msg = *ev->Get();
auto& state = Shards.front();
@@ -412,7 +417,7 @@ private:
case EShardState::Running:
case EShardState::PostRunning: {
if (state.Generation == msg.Generation) {
- ProcessPendingScanDataItem();
+ ProcessPendingScanDataItem(ev, enqueuedAt);
DoExecute();
} else if (state.Generation > msg.Generation) {
TerminateExpiredScan(ev->Sender, "Cancel expired scan");
@@ -429,7 +434,6 @@ private:
break;
}
}
- PendingScanData.pop_front();
}
void HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) {
@@ -799,23 +803,14 @@ private:
state.ActorId = {};
state.State = EShardState::Starting;
state.SubscribedOnTablet = false;
-
auto retryDelay = state.CalcRetryDelay();
- if (retryDelay) {
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", restarting scan from last received key " << PrintLastKey()
- << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"
- << " schedule after " << retryDelay);
-
- state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard));
- } else {
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", restarting scan from last received key " << PrintLastKey()
- << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")");
+ CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
+ << ", restarting scan from last received key " << PrintLastKey()
+ << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"
+ << " schedule after " << retryDelay);
- SendStartScanRequest(state, state.Generation);
- }
+ state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state.TabletId)));
}
bool IsQuotingEnabled() const {