aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-07-21 15:17:42 +0300
committergvit <gvit@ydb.tech>2022-07-21 15:17:42 +0300
commite69963bdc1ff2a83c4d9914f1af110a4cf04c32e (patch)
tree7968e3835029dc87c844c916898bc17a7f535620
parentb0cef02f0738c5798a6d1e28d09101cd5e60709d (diff)
downloadydb-e69963bdc1ff2a83c4d9914f1af110a4cf04c32e.tar.gz
fix shard resolver to get actual data
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp2
-rw-r--r--ydb/core/kqp/executer/kqp_shards_resolver.cpp93
-rw-r--r--ydb/core/kqp/executer/kqp_shards_resolver.h3
3 files changed, 31 insertions, 67 deletions
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 189ca20da0..169e4aa469 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -103,7 +103,7 @@ public:
if (shardIds.size() > 0) {
LOG_D("Start resolving tablets nodes... (" << shardIds.size() << ")");
- auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds), 0);
+ auto kqpShardsResolver = CreateKqpShardsResolver(SelfId(), TxId, std::move(shardIds));
KqpShardsResolverId = RegisterWithSameMailbox(kqpShardsResolver);
} else {
Execute();
diff --git a/ydb/core/kqp/executer/kqp_shards_resolver.cpp b/ydb/core/kqp/executer/kqp_shards_resolver.cpp
index 506dfcf573..cebc64a7bb 100644
--- a/ydb/core/kqp/executer/kqp_shards_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_shards_resolver.cpp
@@ -1,6 +1,6 @@
#include "kqp_shards_resolver.h"
-#include <ydb/core/base/tablet_resolver.h>
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/kqp/executer/kqp_executer.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
@@ -36,21 +36,19 @@ public:
}
public:
- TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds, float failRatio)
+ TKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds)
: Owner(owner)
, TxId(txId)
, ShardIds(std::move(shardIds))
- , MaxFailedShards(ShardIds.size() * failRatio) {}
+ , TabletResolver(MakePipePeNodeCacheID(false))
+ {}
void Bootstrap() {
- auto tabletResolver = MakeTabletResolverID();
- auto resolveFlags = GetResolveFlags();
-
Y_ASSERT(ShardIds.size() > 0);
for (ui64 tabletId : ShardIds) {
LOG_T("Send request about tabletId: " << tabletId);
- bool sent = Send(tabletResolver, new TEvTabletResolver::TEvForward(tabletId, nullptr, resolveFlags));
+ bool sent = Send(TabletResolver, new TEvPipeCache::TEvGetTabletNode(tabletId));
Y_VERIFY_DEBUG(sent);
}
@@ -60,7 +58,7 @@ public:
private:
STATEFN(ResolveState) {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvTabletResolver::TEvForwardResult, HandleResolve);
+ hFunc(TEvPipeCache::TEvGetTabletNodeResult, HandleResolve);
cFunc(TEvents::TSystem::Poison, PassAway);
default: {
LOG_C("Unexpected event: " << ev->GetTypeRewrite());
@@ -69,71 +67,44 @@ private:
}
}
- void HandleResolve(TEvTabletResolver::TEvForwardResult::TPtr& ev) {
+ void HandleResolve(TEvPipeCache::TEvGetTabletNodeResult::TPtr& ev) {
auto* msg = ev->Get();
- LOG_T("Got resolve event for tabletId: " << msg->TabletID << ": " << NKikimrProto::EReplyStatus_Name(msg->Status)
- << ", nodeId: " << msg->TabletActor.NodeId());
-
- if (msg->Status == NKikimrProto::EReplyStatus::OK) {
- Result[msg->TabletID] = msg->TabletActor.NodeId();
-
- if (Result.size() + FailedTablets == ShardIds.size()) {
- LOG_D("Done, success: " << Result.size() << ", failed: " << FailedTablets);
- ReplyAndDie();
- return;
+ LOG_T("Got resolve event for tabletId: " << msg->TabletId << ", nodeId: " << msg->NodeId);
+ if (msg->NodeId != 0) {
+ Result[msg->TabletId] = msg->NodeId;
+ if (Result.size() == ShardIds.size()) {
+ LOG_D("Shard resolve complete, resolved shards: " << Result.size());
+ return ReplyAndDie();
}
return;
}
- auto& state = States[msg->TabletID];
- if (state.Retries > MAX_RETRIES_COUNT) {
- ++FailedTablets;
- if (FailedTablets > MaxFailedShards) {
- LOG_W("Too many failed requests: " << FailedTablets << " (" << ShardIds.size() << ")");
- ReplyErrorAndDie(Ydb::StatusIds::GENERIC_ERROR, TStringBuilder()
- << "Too many unresolved shards: " << FailedTablets);
- return;
- }
-
- if (FailedTablets + Result.size() == ShardIds.size()) {
- LOG_D("Done, success: " << Result.size() << ", failed: " << FailedTablets);
- ReplyAndDie();
- return;
- }
+ ui32& retryCount = RetryCount[msg->TabletId];
+ if (retryCount > MAX_RETRIES_COUNT) {
+ TString reply = TStringBuilder() << "Failed to resolve tablet: " << msg->TabletId << " after several retries.";
+ LOG_W(reply);
+ ReplyErrorAndDie(Ydb::StatusIds::GENERIC_ERROR, std::move(reply));
+ return;
- return; // no more retries for this tabletId
}
- state.Retries++;
-
- // todo: backoff
- Send(MakeTabletResolverID(), new TEvTabletResolver::TEvForward(msg->TabletID, nullptr, GetResolveFlags()));
- }
-
- TEvTabletResolver::TEvForward::TResolveFlags GetResolveFlags() {
- TEvTabletResolver::TEvForward::TResolveFlags resolveFlags;
- resolveFlags.SetAllowFollower(false);
- resolveFlags.SetForceFollower(false);
- resolveFlags.SetPreferLocal(true);
- resolveFlags.SetForceLocal(false);
-
- return resolveFlags;
+ ++retryCount;
+ Send(TabletResolver, new TEvPipeCache::TEvGetTabletNode(msg->TabletId));
}
void ReplyErrorAndDie(Ydb::StatusIds::StatusCode status, TString&& message) {
- auto replyEv = MakeHolder<TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>();
replyEv->Status = status;
replyEv->Issues.AddIssue(TIssue(message));
- Send(Owner, replyEv.Release());
+ Send(Owner, replyEv.release());
PassAway();
}
void ReplyAndDie() {
- auto replyEv = MakeHolder<TEvKqpExecuter::TEvShardsResolveStatus>();
+ auto replyEv = std::make_unique<TEvKqpExecuter::TEvShardsResolveStatus>();
replyEv->ShardNodes = std::move(Result);
- replyEv->Unresolved = FailedTablets;
- Send(Owner, replyEv.Release());
+ Send(Owner, replyEv.release());
PassAway();
}
@@ -141,21 +112,15 @@ private:
const TActorId Owner;
const ui64 TxId;
const TSet<ui64> ShardIds;
- const ui32 MaxFailedShards;
-
- struct TState {
- ui32 Retries = 0;
- };
- TMap<ui64, TState> States;
- ui32 FailedTablets = 0;
-
+ const TActorId TabletResolver;
+ TMap<ui64, ui32> RetryCount;
TMap<ui64, ui64> Result;
};
} // anonymous namespace
-IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds, float failRatio) {
- return new TKqpShardsResolver(owner, txId, std::move(shardIds), failRatio);
+IActor* CreateKqpShardsResolver(const TActorId& owner, ui64 txId, TSet<ui64>&& shardIds) {
+ return new TKqpShardsResolver(owner, txId, std::move(shardIds));
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer/kqp_shards_resolver.h b/ydb/core/kqp/executer/kqp_shards_resolver.h
index 8408d79c7b..720a6086ab 100644
--- a/ydb/core/kqp/executer/kqp_shards_resolver.h
+++ b/ydb/core/kqp/executer/kqp_shards_resolver.h
@@ -5,7 +5,6 @@
namespace NKikimr::NKqp {
-NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds,
- float failRatio);
+NActors::IActor* CreateKqpShardsResolver(const NActors::TActorId& owner, ui64 txId, TSet<ui64>&& shardIds);
} // namespace NKikimr::NKqp