diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-01 17:22:43 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-01 17:22:43 +0300 |
commit | 257d958d39fcc443f08920bfe8c1c87b760e1716 (patch) | |
tree | 6774d1181d05031b45ed01c127b3062f681156f3 | |
parent | f36fdcfbd29c3b260b7de1c02000b23572b1168d (diff) | |
download | ydb-257d958d39fcc443f08920bfe8c1c87b760e1716.tar.gz |
timed retry delays
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 76d1c7210a..2fe5f3c764 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -10,6 +10,7 @@ #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/tx/datashard/range_ops.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> +#include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h> @@ -62,6 +63,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() { NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false); +TDuration StartRetryDelay = TDuration::MilliSeconds(250); + } @@ -315,6 +318,22 @@ public: } }; + enum EEv { + EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), + }; + + struct TEvRetryShard: public TEventLocal<TEvRetryShard, EvRetryShard> { + public: + explicit TEvRetryShard(const ui64 readId, const ui64 maxSeqNo) + : ReadId(readId) + , MaxSeqNo(maxSeqNo) + { + } + public: + ui64 ReadId = 0; + ui64 MaxSeqNo = 0; + }; + public: TKqpReadActor( NKikimrTxDataShard::TKqpReadRangesSourceSettings&& settings, @@ -364,6 +383,7 @@ public: hFunc(TEvDataShard::TEvReadResult, HandleRead); hFunc(TEvTxProxySchemeCache::TEvResolveKeySetResult, HandleResolve); hFunc(TEvPipeCache::TEvDeliveryProblem, HandleError); + hFunc(TEvRetryShard, HandleRetry); IgnoreFunc(TEvInterconnect::TEvNodeConnected); IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult); } @@ -627,7 +647,32 @@ public: StartTableScan(); } - void RetryRead(ui64 id) { + void HandleRetry(TEvRetryShard::TPtr& ev) { + auto& read = Reads[ev->Get()->ReadId]; + if (read.LastSeqNo <= ev->Get()->MaxSeqNo) { + DoRetryRead(ev->Get()->ReadId); + } + } + + void RetryRead(ui64 id, bool allowInstantRetry = true) { + if (!Reads[id]) { + return; + } + + auto state = Reads[id].Shard; + if (state->RetryAttempt == 0 && allowInstantRetry) { // instant retry + return DoRetryRead(id); + } + auto delay = ::StartRetryDelay; + for (size_t i = 0; i < state->RetryAttempt; ++i) { + delay *= 2; + } + + CA_LOG_D("schedule retry #" << id << " after " << delay); + TlsActivationContext->Schedule(delay, new IEventHandle(SelfId(), SelfId(), new TEvRetryShard(id, Reads[id].LastSeqNo))); + } + + void DoRetryRead(ui64 id) { if (!Reads[id]) { return; } @@ -750,7 +795,9 @@ public: switch (record.GetStatus().GetCode()) { case Ydb::StatusIds::SUCCESS: break; - case Ydb::StatusIds::OVERLOADED: + case Ydb::StatusIds::OVERLOADED: { + return RetryRead(id, false); + } case Ydb::StatusIds::INTERNAL_ERROR: { return RetryRead(id); } @@ -800,7 +847,7 @@ public: reads.swap(ReadIdByTabletId[msg.TabletId]); for (auto read : reads) { CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered); - RetryRead(read); + RetryRead(read, false); } } @@ -975,6 +1022,7 @@ public: bytes += rowSize.AllocatedBytes; if (ProcessedRowCount == Settings.GetItemsLimit()) { finished = true; + CA_LOG_D(TStringBuilder() << " returned async data because limit reached"); return bytes; } } |