diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-12-19 17:47:32 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-19 17:47:32 +0300 |
commit | 4b886566d67500dba7acaf4bf35d1fe2fb5ef96b (patch) | |
tree | 1d7ae35da1173ca034cde6bd65e9c7d33bd7b5f3 | |
parent | efaaa8acd75de5142e87bd2d13f7d54d92be3a5f (diff) | |
download | ydb-4b886566d67500dba7acaf4bf35d1fe2fb5ef96b.tar.gz |
Fix disconnected proxies unexpectedly registering in kesus (#12782)
-rw-r--r-- | ydb/core/kesus/proxy/proxy_actor_ut.cpp | 35 | ||||
-rw-r--r-- | ydb/core/kesus/tablet/tablet_impl.cpp | 23 | ||||
-rw-r--r-- | ydb/core/kesus/tablet/tablet_impl.h | 1 | ||||
-rw-r--r-- | ydb/core/kesus/tablet/tablet_ut.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kesus/tablet/ut_helpers.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kesus/tablet/ut_helpers.h | 1 |
6 files changed, 96 insertions, 4 deletions
diff --git a/ydb/core/kesus/proxy/proxy_actor_ut.cpp b/ydb/core/kesus/proxy/proxy_actor_ut.cpp index 98c8a95574..54afe8fb66 100644 --- a/ydb/core/kesus/proxy/proxy_actor_ut.cpp +++ b/ydb/core/kesus/proxy/proxy_actor_ut.cpp @@ -1,4 +1,6 @@ #include "ut_helpers.h" +#include <ydb/core/testlib/actors/block_events.h> +#include <ydb/library/actors/interconnect/interconnect_impl.h> namespace NKikimr { namespace NKesus { @@ -56,6 +58,39 @@ Y_UNIT_TEST_SUITE(TProxyActorTest) { auto req5cookie = ctx.SendAttachSession(42, req5, 1, 0, "", 42); ctx.ExpectAttachSessionResult(req5, req5cookie, Ydb::StatusIds::BAD_SESSION); } + + Y_UNIT_TEST(TestDisconnectWhileAttaching) { + TTestContext ctx; + ctx.Setup(); + + auto& runtime = *ctx.Runtime; + + TBlockEvents<TEvKesus::TEvRegisterProxy> blockedRegister(runtime); + + auto req1 = runtime.AllocateEdgeActor(1); + auto req1cookie = ctx.SendAttachSession(42, req1, 0, 0, "", 111); + + runtime.WaitFor("blocked registrations", [&]{ return blockedRegister.size() >= 1; }); + + // drop link between 2 nodes + { + runtime.Send( + new IEventHandle( + runtime.GetInterconnectProxy(0, 1), + TActorId(), + new TEvInterconnect::TEvDisconnect()), + 0, true); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + runtime.DispatchEvents(options); + } + + ctx.ExpectProxyError(req1, req1cookie, Ydb::StatusIds::UNAVAILABLE); + + // Proxy must not crash due to unexpected registration after a disconnect + blockedRegister.Stop().Unblock(); + runtime.SimulateSleep(TDuration::Seconds(1)); + } } } diff --git a/ydb/core/kesus/tablet/tablet_impl.cpp b/ydb/core/kesus/tablet/tablet_impl.cpp index 3644f2c63b..e53aaaf24a 100644 --- a/ydb/core/kesus/tablet/tablet_impl.cpp +++ b/ydb/core/kesus/tablet/tablet_impl.cpp @@ -130,13 +130,23 @@ void TKesusTablet::Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) { void TKesusTablet::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) { const auto* msg = ev->Get(); - for (auto* proxy : ProxiesByNode[msg->NodeId]) { + auto& nodeProxies = ProxiesByNode[msg->NodeId]; + for (auto it = nodeProxies.begin(); it != nodeProxies.end(); /* nothing*/) { + auto* proxy = *it; + if (proxy->InterconnectSession && proxy->InterconnectSession != ev->Sender) { + // Skip proxies from a different session + ++it; + continue; + } ClearProxy(proxy, TActivationContext::AsActorContext()); Proxies.erase(proxy->ActorID); TabletCounters->Simple()[COUNTER_PROXY_COUNT].Add(-1); TabletCounters->Cumulative()[COUNTER_PROXY_KICKED].Increment(1); + nodeProxies.erase(it++); + } + if (nodeProxies.empty()) { + ProxiesByNode.erase(msg->NodeId); } - ProxiesByNode.erase(msg->NodeId); } void TKesusTablet::Handle(TEvents::TEvWakeup::TPtr& ev) { @@ -197,13 +207,20 @@ void TKesusTablet::Handle(TEvKesus::TEvRegisterProxy::TPtr& ev) { } proxy->ActorID = ev->Sender; proxy->Generation = record.GetProxyGeneration(); + proxy->InterconnectSession = ev->InterconnectSession; // New proxy is always cleared when it registers ClearProxy(proxy, TActivationContext::AsActorContext()); ProxiesByNode[ev->Sender.NodeId()].insert(proxy); - Send(ev->Sender, + + // Send result using the same interconnect session the request was received from + auto result = std::make_unique<IEventHandle>(ev->Sender, SelfId(), new TEvKesus::TEvRegisterProxyResult(record.GetProxyGeneration()), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); + if (ev->InterconnectSession) { + result->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + TActivationContext::Send(result.release()); } void TKesusTablet::Handle(TEvKesus::TEvUnregisterProxy::TPtr& ev) { diff --git a/ydb/core/kesus/tablet/tablet_impl.h b/ydb/core/kesus/tablet/tablet_impl.h index 26eab5ad86..6d40117715 100644 --- a/ydb/core/kesus/tablet/tablet_impl.h +++ b/ydb/core/kesus/tablet/tablet_impl.h @@ -91,6 +91,7 @@ private: struct TProxyInfo { TActorId ActorID; ui64 Generation = 0; + TActorId InterconnectSession; THashSet<ui64> AttachedSessions; }; diff --git a/ydb/core/kesus/tablet/tablet_ut.cpp b/ydb/core/kesus/tablet/tablet_ut.cpp index 99a9e51e99..c92025e5b5 100644 --- a/ydb/core/kesus/tablet/tablet_ut.cpp +++ b/ydb/core/kesus/tablet/tablet_ut.cpp @@ -5,6 +5,7 @@ #include <ydb/core/metering/metering.h> +#include <ydb/core/testlib/actors/block_events.h> #include <ydb/library/actors/interconnect/interconnect_impl.h> #include <util/random/random.h> @@ -133,6 +134,38 @@ Y_UNIT_TEST_SUITE(TKesusTest) { ctx.VerifyProxyNotRegistered(proxy); } + Y_UNIT_TEST(TestRegisterProxyLinkFailureRace) { + TTestContext ctx; + ctx.Setup(2); + + // block registration requests + TBlockEvents<TEvKesus::TEvRegisterProxy> blockedRegister(*ctx.Runtime); + + // start registering proxy from the second node + auto proxy = ctx.Runtime->AllocateEdgeActor(1); + ctx.SendRegisterProxy(proxy, 1); + + ctx.Runtime->WaitFor("register request", [&]{ return blockedRegister.size() >= 1; }); + + // drop link between 2 nodes and unblock the request + { + ctx.Runtime->Send( + new IEventHandle( + ctx.Runtime->GetInterconnectProxy(0, 1), + TActorId(), + new TEvInterconnect::TEvDisconnect()), + 0, true); + blockedRegister.Stop().Unblock(); + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + ctx.Runtime->DispatchEvents(options); + } + + // Verify proxy is not registered + ctx.Runtime->SimulateSleep(TDuration::Seconds(1)); + ctx.VerifyProxyNotRegistered(proxy); + } + Y_UNIT_TEST(TestUnregisterProxy) { TTestContext ctx; ctx.Setup(); diff --git a/ydb/core/kesus/tablet/ut_helpers.cpp b/ydb/core/kesus/tablet/ut_helpers.cpp index c95363c6bf..5c9af3ee5b 100644 --- a/ydb/core/kesus/tablet/ut_helpers.cpp +++ b/ydb/core/kesus/tablet/ut_helpers.cpp @@ -188,9 +188,14 @@ void TTestContext::SyncProxy(const TActorId& proxy, ui64 generation, bool useTra ExpectEdgeEvent<TEvKesus::TEvDummyResponse>(proxy, cookie); } -NKikimrKesus::TEvRegisterProxyResult TTestContext::RegisterProxy(const TActorId& proxy, ui64 generation) { +ui64 TTestContext::SendRegisterProxy(const TActorId& proxy, ui64 generation) { ui64 cookie = RandomNumber<ui64>(); SendFromProxy(proxy, generation, new TEvKesus::TEvRegisterProxy("", generation), cookie); + return cookie; +} + +NKikimrKesus::TEvRegisterProxyResult TTestContext::RegisterProxy(const TActorId& proxy, ui64 generation) { + ui64 cookie = SendRegisterProxy(proxy, generation); auto result = ExpectEdgeEvent<TEvKesus::TEvRegisterProxyResult>(proxy, cookie); UNIT_ASSERT_VALUES_EQUAL(result->Record.GetProxyGeneration(), generation); return result->Record; diff --git a/ydb/core/kesus/tablet/ut_helpers.h b/ydb/core/kesus/tablet/ut_helpers.h index 8f487a3033..52632d5c9a 100644 --- a/ydb/core/kesus/tablet/ut_helpers.h +++ b/ydb/core/kesus/tablet/ut_helpers.h @@ -82,6 +82,7 @@ struct TTestContext { void SyncProxy(const TActorId& proxy, ui64 generation, bool useTransactions = false); // Registers a proxy/generation pair with the tablet + ui64 SendRegisterProxy(const TActorId& proxy, ui64 generation); NKikimrKesus::TEvRegisterProxyResult RegisterProxy(const TActorId& proxy, ui64 generation); void MustRegisterProxy(const TActorId& proxy, ui64 generation, Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS); |