diff options
author | gvit <gvit@ydb.tech> | 2022-07-21 15:17:42 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-07-21 15:17:42 +0300 |
commit | e69963bdc1ff2a83c4d9914f1af110a4cf04c32e (patch) | |
tree | 7968e3835029dc87c844c916898bc17a7f535620 | |
parent | b0cef02f0738c5798a6d1e28d09101cd5e60709d (diff) | |
download | ydb-e69963bdc1ff2a83c4d9914f1af110a4cf04c32e.tar.gz |
fix shard resolver to get actual data
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_shards_resolver.cpp | 93 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_shards_resolver.h | 3 |
3 files changed, 31 insertions, 67 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 189ca20da0..169e4aa469 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -103,7 +103,7 @@ public: if (shardIds.size() > 0) { LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")"); - auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds), 0); + auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds)); KqpShardsResolverId = RegisterWithSameMailbox(kqpShardsResolver); } else { Execute(); diff --git a/ydb/core/kqp/executer/kqp_shards_resolver.cpp b/ydb/core/kqp/executer/kqp_shards_resolver.cpp index 506dfcf573..cebc64a7bb 100644 --- a/ydb/core/kqp/executer/kqp_shards_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_shards_resolver.cpp @@ -1,6 +1,6 @@ #include "kqp_shards_resolver.h" -#include <ydb/core/base/tablet_resolver.h> +#include <ydb/core/base/tablet_pipecache.h> #include <ydb/core/kqp/executer/kqp_executer.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> @@ -36,21 +36,19 @@ public: } public: - TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds, float failRatio) + TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) : Owner(owner) , TxId(txId) , ShardIds(std::move(shardIds)) - , MaxFailedShards(ShardIds.size() * failRatio) {} + , TabletResolver(MakePipePeNodeCacheID(false)) + {} void Bootstrap() { - auto tabletResolver = MakeTabletResolverID(); - auto resolveFlags = GetResolveFlags(); - Y_ASSERT(ShardIds.size() > 0); for (ui64 tabletId : ShardIds) { LOG_T("Send request about tabletId: " << tabletId); - bool sent = Send(tabletResolver, new TEvTabletResolver::TEvForward(tabletId, nullptr, resolveFlags)); + bool sent = Send(TabletResolver, new TEvPipeCache::TEvGetTabletNode(tabletId)); Y_VERIFY_DEBUG(sent); } @@ -60,7 +58,7 @@ public: private: STATEFN(ResolveState) { switch (ev->GetTypeRewrite()) { - hFunc(TEvTabletResolver::TEvForwardResult, HandleResolve); + hFunc(TEvPipeCache::TEvGetTabletNodeResult, HandleResolve); cFunc(TEvents::TSystem::Poison, PassAway); default: { LOG_C("Unexpected event: " << ev->GetTypeRewrite()); @@ -69,71 +67,44 @@ private: } } - void HandleResolve(TEvTabletResolver::TEvForwardResult::TPtr& ev) { + void HandleResolve(TEvPipeCache::TEvGetTabletNodeResult::TPtr& ev) { auto* msg = ev->Get(); - LOG_T("Got resolve event for tabletId: " << msg->TabletID << ": " << NKikimrProto::EReplyStatus_Name(msg->Status) - << ", nodeId: " << msg->TabletActor.NodeId()); - - if (msg->Status == NKikimrProto::EReplyStatus::OK) { - Result[msg->TabletID] = msg->TabletActor.NodeId(); - - if (Result.size() + FailedTablets == ShardIds.size()) { - LOG_D("Done, success: " << Result.size() << ", failed: " << FailedTablets); - ReplyAndDie(); - return; + LOG_T("Got resolve event for tabletId: " << msg->TabletId << ", nodeId: " << msg->NodeId); + if (msg->NodeId != 0) { + Result[msg->TabletId] = msg->NodeId; + if (Result.size() == ShardIds.size()) { + LOG_D("Shard resolve complete, resolved shards: " << Result.size()); + return ReplyAndDie(); } return; } - auto& state = States[msg->TabletID]; - if (state.Retries > MAX_RETRIES_COUNT) { - ++FailedTablets; - if (FailedTablets > MaxFailedShards) { - LOG_W("Too many failed requests: " << FailedTablets << " (" << ShardIds.size() << ")"); - ReplyErrorAndDie(Ydb::StatusIds::GENERIC_ERROR, TStringBuilder() - << "Too many unresolved shards: " << FailedTablets); - return; - } - - if (FailedTablets + Result.size() == ShardIds.size()) { - LOG_D("Done, success: " << Result.size() << ", failed: " << FailedTablets); - ReplyAndDie(); - return; - } + ui32& retryCount = RetryCount[msg->TabletId]; + if (retryCount > MAX_RETRIES_COUNT) { + TString reply = TStringBuilder() << "Failed to resolve tablet: " << msg->TabletId << " after several retries."; + LOG_W(reply); + ReplyErrorAndDie(Ydb::StatusIds::GENERIC_ERROR, std::move(reply)); + return; - return; // no more retries for this tabletId } - state.Retries++; - - // todo: backoff - Send(MakeTabletResolverID(), new TEvTabletResolver::TEvForward(msg->TabletID, nullptr, GetResolveFlags())); - } - - TEvTabletResolver::TEvForward::TResolveFlags GetResolveFlags() { - TEvTabletResolver::TEvForward::TResolveFlags resolveFlags; - resolveFlags.SetAllowFollower(false); - resolveFlags.SetForceFollower(false); - resolveFlags.SetPreferLocal(true); - resolveFlags.SetForceLocal(false); - - return resolveFlags; + ++retryCount; + Send(TabletResolver, new TEvPipeCache::TEvGetTabletNode(msg->TabletId)); } void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, TString&& message) { - auto replyEv = MakeHolder<TEvKqpExecuter::TEvShardsResolveStatus>(); + auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>(); replyEv->Status = status; replyEv->Issues.AddIssue(TIssue(message)); - Send(Owner, replyEv.Release()); + Send(Owner, replyEv.release()); PassAway(); } void ReplyAndDie() { - auto replyEv = MakeHolder<TEvKqpExecuter::TEvShardsResolveStatus>(); + auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>(); replyEv->ShardNodes = std::move(Result); - replyEv->Unresolved = FailedTablets; - Send(Owner, replyEv.Release()); + Send(Owner, replyEv.release()); PassAway(); } @@ -141,21 +112,15 @@ private: const TActorId Owner; const ui64 TxId; const TSet<ui64> ShardIds; - const ui32 MaxFailedShards; - - struct TState { - ui32 Retries = 0; - }; - TMap<ui64, TState> States; - ui32 FailedTablets = 0; - + const TActorId TabletResolver; + TMap<ui64, ui32> RetryCount; TMap<ui64, ui64> Result; }; } // anonymous namespace -IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds, float failRatio) { - return new TKqpShardsResolver(owner, txId, std::move(shardIds), failRatio); +IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) { + return new TKqpShardsResolver(owner, txId, std::move(shardIds)); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer/kqp_shards_resolver.h b/ydb/core/kqp/executer/kqp_shards_resolver.h index 8408d79c7b..720a6086ab 100644 --- a/ydb/core/kqp/executer/kqp_shards_resolver.h +++ b/ydb/core/kqp/executer/kqp_shards_resolver.h @@ -5,7 +5,6 @@ namespace NKikimr::NKqp { -NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds, - float failRatio); +NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds); } // namespace NKikimr::NKqp |