aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-01 17:22:43 +0300
committerssmike <ssmike@ydb.tech>2023-03-01 17:22:43 +0300
commit257d958d39fcc443f08920bfe8c1c87b760e1716 (patch)
tree6774d1181d05031b45ed01c127b3062f681156f3
parentf36fdcfbd29c3b260b7de1c02000b23572b1168d (diff)
downloadydb-257d958d39fcc443f08920bfe8c1c87b760e1716.tar.gz
timed retry delays
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp54
1 files changed, 51 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 76d1c7210a..2fe5f3c764 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/tx/datashard/datashard.h>
#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h>
@@ -62,6 +63,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() {
NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false);
+TDuration StartRetryDelay = TDuration::MilliSeconds(250);
+
}
@@ -315,6 +318,22 @@ public:
}
};
+ enum EEv {
+ EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
+ };
+
+ struct TEvRetryShard: public TEventLocal<TEvRetryShard, EvRetryShard> {
+ public:
+ explicit TEvRetryShard(const ui64 readId, const ui64 maxSeqNo)
+ : ReadId(readId)
+ , MaxSeqNo(maxSeqNo)
+ {
+ }
+ public:
+ ui64 ReadId = 0;
+ ui64 MaxSeqNo = 0;
+ };
+
public:
TKqpReadActor(
NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings,
@@ -364,6 +383,7 @@ public:
hFunc(TEvDataShard::TEvReadResult, HandleRead);
hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolve);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleError);
+ hFunc(TEvRetryShard, HandleRetry);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
}
@@ -627,7 +647,32 @@ public:
StartTableScan();
}
- void RetryRead(ui64 id) {
+ void HandleRetry(TEvRetryShard::TPtr& ev) {
+ auto& read = Reads[ev->Get()->ReadId];
+ if (read.LastSeqNo <= ev->Get()->MaxSeqNo) {
+ DoRetryRead(ev->Get()->ReadId);
+ }
+ }
+
+ void RetryRead(ui64 id, bool allowInstantRetry = true) {
+ if (!Reads[id]) {
+ return;
+ }
+
+ auto state = Reads[id].Shard;
+ if (state->RetryAttempt == 0 && allowInstantRetry) { // instant retry
+ return DoRetryRead(id);
+ }
+ auto delay = ::StartRetryDelay;
+ for (size_t i = 0; i < state->RetryAttempt; ++i) {
+ delay *= 2;
+ }
+
+ CA_LOG_D("schedule retry #" << id << " after " << delay);
+ TlsActivationContext->Schedule(delay, new IEventHandle(SelfId(), SelfId(), new TEvRetryShard(id, Reads[id].LastSeqNo)));
+ }
+
+ void DoRetryRead(ui64 id) {
if (!Reads[id]) {
return;
}
@@ -750,7 +795,9 @@ public:
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
- case Ydb::StatusIds::OVERLOADED:
+ case Ydb::StatusIds::OVERLOADED: {
+ return RetryRead(id, false);
+ }
case Ydb::StatusIds::INTERNAL_ERROR: {
return RetryRead(id);
}
@@ -800,7 +847,7 @@ public:
reads.swap(ReadIdByTabletId[msg.TabletId]);
for (auto read : reads) {
CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
- RetryRead(read);
+ RetryRead(read, false);
}
}
@@ -975,6 +1022,7 @@ public:
bytes += rowSize.AllocatedBytes;
if (ProcessedRowCount == Settings.GetItemsLimit()) {
finished = true;
+ CA_LOG_D(TStringBuilder() << " returned async data because limit reached");
return bytes;
}
}