aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-12-19 17:47:32 +0300
committerGitHub <noreply@github.com>2024-12-19 17:47:32 +0300
commit4b886566d67500dba7acaf4bf35d1fe2fb5ef96b (patch)
tree1d7ae35da1173ca034cde6bd65e9c7d33bd7b5f3
parentefaaa8acd75de5142e87bd2d13f7d54d92be3a5f (diff)
downloadydb-4b886566d67500dba7acaf4bf35d1fe2fb5ef96b.tar.gz
Fix disconnected proxies unexpectedly registering in kesus (#12782)
-rw-r--r--ydb/core/kesus/proxy/proxy_actor_ut.cpp35
-rw-r--r--ydb/core/kesus/tablet/tablet_impl.cpp23
-rw-r--r--ydb/core/kesus/tablet/tablet_impl.h1
-rw-r--r--ydb/core/kesus/tablet/tablet_ut.cpp33
-rw-r--r--ydb/core/kesus/tablet/ut_helpers.cpp7
-rw-r--r--ydb/core/kesus/tablet/ut_helpers.h1
6 files changed, 96 insertions, 4 deletions
diff --git a/ydb/core/kesus/proxy/proxy_actor_ut.cpp b/ydb/core/kesus/proxy/proxy_actor_ut.cpp
index 98c8a95574..54afe8fb66 100644
--- a/ydb/core/kesus/proxy/proxy_actor_ut.cpp
+++ b/ydb/core/kesus/proxy/proxy_actor_ut.cpp
@@ -1,4 +1,6 @@
#include "ut_helpers.h"
+#include <ydb/core/testlib/actors/block_events.h>
+#include <ydb/library/actors/interconnect/interconnect_impl.h>
namespace NKikimr {
namespace NKesus {
@@ -56,6 +58,39 @@ Y_UNIT_TEST_SUITE(TProxyActorTest) {
auto req5cookie = ctx.SendAttachSession(42, req5, 1, 0, "", 42);
ctx.ExpectAttachSessionResult(req5, req5cookie, Ydb::StatusIds::BAD_SESSION);
}
+
+ Y_UNIT_TEST(TestDisconnectWhileAttaching) {
+ TTestContext ctx;
+ ctx.Setup();
+
+ auto& runtime = *ctx.Runtime;
+
+ TBlockEvents<TEvKesus::TEvRegisterProxy> blockedRegister(runtime);
+
+ auto req1 = runtime.AllocateEdgeActor(1);
+ auto req1cookie = ctx.SendAttachSession(42, req1, 0, 0, "", 111);
+
+ runtime.WaitFor("blocked registrations", [&]{ return blockedRegister.size() >= 1; });
+
+ // drop link between 2 nodes
+ {
+ runtime.Send(
+ new IEventHandle(
+ runtime.GetInterconnectProxy(0, 1),
+ TActorId(),
+ new TEvInterconnect::TEvDisconnect()),
+ 0, true);
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected);
+ runtime.DispatchEvents(options);
+ }
+
+ ctx.ExpectProxyError(req1, req1cookie, Ydb::StatusIds::UNAVAILABLE);
+
+ // Proxy must not crash due to unexpected registration after a disconnect
+ blockedRegister.Stop().Unblock();
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ }
}
}
diff --git a/ydb/core/kesus/tablet/tablet_impl.cpp b/ydb/core/kesus/tablet/tablet_impl.cpp
index 3644f2c63b..e53aaaf24a 100644
--- a/ydb/core/kesus/tablet/tablet_impl.cpp
+++ b/ydb/core/kesus/tablet/tablet_impl.cpp
@@ -130,13 +130,23 @@ void TKesusTablet::Handle(TEvInterconnect::TEvNodeConnected::TPtr& ev) {
void TKesusTablet::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr& ev) {
const auto* msg = ev->Get();
- for (auto* proxy : ProxiesByNode[msg->NodeId]) {
+ auto& nodeProxies = ProxiesByNode[msg->NodeId];
+ for (auto it = nodeProxies.begin(); it != nodeProxies.end(); /* nothing*/) {
+ auto* proxy = *it;
+ if (proxy->InterconnectSession && proxy->InterconnectSession != ev->Sender) {
+ // Skip proxies from a different session
+ ++it;
+ continue;
+ }
ClearProxy(proxy, TActivationContext::AsActorContext());
Proxies.erase(proxy->ActorID);
TabletCounters->Simple()[COUNTER_PROXY_COUNT].Add(-1);
TabletCounters->Cumulative()[COUNTER_PROXY_KICKED].Increment(1);
+ nodeProxies.erase(it++);
+ }
+ if (nodeProxies.empty()) {
+ ProxiesByNode.erase(msg->NodeId);
}
- ProxiesByNode.erase(msg->NodeId);
}
void TKesusTablet::Handle(TEvents::TEvWakeup::TPtr& ev) {
@@ -197,13 +207,20 @@ void TKesusTablet::Handle(TEvKesus::TEvRegisterProxy::TPtr& ev) {
}
proxy->ActorID = ev->Sender;
proxy->Generation = record.GetProxyGeneration();
+ proxy->InterconnectSession = ev->InterconnectSession;
// New proxy is always cleared when it registers
ClearProxy(proxy, TActivationContext::AsActorContext());
ProxiesByNode[ev->Sender.NodeId()].insert(proxy);
- Send(ev->Sender,
+
+ // Send result using the same interconnect session the request was received from
+ auto result = std::make_unique<IEventHandle>(ev->Sender, SelfId(),
new TEvKesus::TEvRegisterProxyResult(record.GetProxyGeneration()),
IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
ev->Cookie);
+ if (ev->InterconnectSession) {
+ result->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ TActivationContext::Send(result.release());
}
void TKesusTablet::Handle(TEvKesus::TEvUnregisterProxy::TPtr& ev) {
diff --git a/ydb/core/kesus/tablet/tablet_impl.h b/ydb/core/kesus/tablet/tablet_impl.h
index 26eab5ad86..6d40117715 100644
--- a/ydb/core/kesus/tablet/tablet_impl.h
+++ b/ydb/core/kesus/tablet/tablet_impl.h
@@ -91,6 +91,7 @@ private:
struct TProxyInfo {
TActorId ActorID;
ui64 Generation = 0;
+ TActorId InterconnectSession;
THashSet<ui64> AttachedSessions;
};
diff --git a/ydb/core/kesus/tablet/tablet_ut.cpp b/ydb/core/kesus/tablet/tablet_ut.cpp
index 99a9e51e99..c92025e5b5 100644
--- a/ydb/core/kesus/tablet/tablet_ut.cpp
+++ b/ydb/core/kesus/tablet/tablet_ut.cpp
@@ -5,6 +5,7 @@
#include <ydb/core/metering/metering.h>
+#include <ydb/core/testlib/actors/block_events.h>
#include <ydb/library/actors/interconnect/interconnect_impl.h>
#include <util/random/random.h>
@@ -133,6 +134,38 @@ Y_UNIT_TEST_SUITE(TKesusTest) {
ctx.VerifyProxyNotRegistered(proxy);
}
+ Y_UNIT_TEST(TestRegisterProxyLinkFailureRace) {
+ TTestContext ctx;
+ ctx.Setup(2);
+
+ // block registration requests
+ TBlockEvents<TEvKesus::TEvRegisterProxy> blockedRegister(*ctx.Runtime);
+
+ // start registering proxy from the second node
+ auto proxy = ctx.Runtime->AllocateEdgeActor(1);
+ ctx.SendRegisterProxy(proxy, 1);
+
+ ctx.Runtime->WaitFor("register request", [&]{ return blockedRegister.size() >= 1; });
+
+ // drop link between 2 nodes and unblock the request
+ {
+ ctx.Runtime->Send(
+ new IEventHandle(
+ ctx.Runtime->GetInterconnectProxy(0, 1),
+ TActorId(),
+ new TEvInterconnect::TEvDisconnect()),
+ 0, true);
+ blockedRegister.Stop().Unblock();
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected);
+ ctx.Runtime->DispatchEvents(options);
+ }
+
+ // Verify proxy is not registered
+ ctx.Runtime->SimulateSleep(TDuration::Seconds(1));
+ ctx.VerifyProxyNotRegistered(proxy);
+ }
+
Y_UNIT_TEST(TestUnregisterProxy) {
TTestContext ctx;
ctx.Setup();
diff --git a/ydb/core/kesus/tablet/ut_helpers.cpp b/ydb/core/kesus/tablet/ut_helpers.cpp
index c95363c6bf..5c9af3ee5b 100644
--- a/ydb/core/kesus/tablet/ut_helpers.cpp
+++ b/ydb/core/kesus/tablet/ut_helpers.cpp
@@ -188,9 +188,14 @@ void TTestContext::SyncProxy(const TActorId& proxy, ui64 generation, bool useTra
ExpectEdgeEvent<TEvKesus::TEvDummyResponse>(proxy, cookie);
}
-NKikimrKesus::TEvRegisterProxyResult TTestContext::RegisterProxy(const TActorId& proxy, ui64 generation) {
+ui64 TTestContext::SendRegisterProxy(const TActorId& proxy, ui64 generation) {
ui64 cookie = RandomNumber<ui64>();
SendFromProxy(proxy, generation, new TEvKesus::TEvRegisterProxy("", generation), cookie);
+ return cookie;
+}
+
+NKikimrKesus::TEvRegisterProxyResult TTestContext::RegisterProxy(const TActorId& proxy, ui64 generation) {
+ ui64 cookie = SendRegisterProxy(proxy, generation);
auto result = ExpectEdgeEvent<TEvKesus::TEvRegisterProxyResult>(proxy, cookie);
UNIT_ASSERT_VALUES_EQUAL(result->Record.GetProxyGeneration(), generation);
return result->Record;
diff --git a/ydb/core/kesus/tablet/ut_helpers.h b/ydb/core/kesus/tablet/ut_helpers.h
index 8f487a3033..52632d5c9a 100644
--- a/ydb/core/kesus/tablet/ut_helpers.h
+++ b/ydb/core/kesus/tablet/ut_helpers.h
@@ -82,6 +82,7 @@ struct TTestContext {
void SyncProxy(const TActorId& proxy, ui64 generation, bool useTransactions = false);
// Registers a proxy/generation pair with the tablet
+ ui64 SendRegisterProxy(const TActorId& proxy, ui64 generation);
NKikimrKesus::TEvRegisterProxyResult RegisterProxy(const TActorId& proxy, ui64 generation);
void MustRegisterProxy(const TActorId& proxy, ui64 generation, Ydb::StatusIds::StatusCode status = Ydb::StatusIds::SUCCESS);