aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-05-29 19:36:48 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-05-29 19:36:48 +0300
commit1f91bff6f709b6f762453f020dce570db6d2b180 (patch)
tree5b4bb56f40ae41dc9aaf455696afd4b0282582ee
parent17e7defc44375d4dc7ef3466536d5321c27f81b9 (diff)
downloadydb-1f91bff6f709b6f762453f020dce570db6d2b180.tar.gz
reconnect replicas for subscriber
-rw-r--r--ydb/core/base/board_lookup.cpp270
-rw-r--r--ydb/core/base/board_replica.cpp38
-rw-r--r--ydb/core/base/board_subscriber_ut.cpp208
-rw-r--r--ydb/core/base/statestorage.h9
-rw-r--r--ydb/core/base/statestorage_impl.h3
-rw-r--r--ydb/core/protos/statestorage.proto2
6 files changed, 416 insertions, 114 deletions
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp
index 28959ab861..d991cbc810 100644
--- a/ydb/core/base/board_lookup.cpp
+++ b/ydb/core/base/board_lookup.cpp
@@ -1,5 +1,7 @@
#include "statestorage_impl.h"
#include "tabletid.h"
+
+#include <ydb/core/base/appdata.h>
#include <ydb/core/protos/services.pb.h>
#include <library/cpp/actors/core/interconnect.h>
@@ -26,17 +28,38 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
const ui32 StateStorageGroupId;
const bool Subscriber;
+ static constexpr int MAX_REPLICAS_COUNT_EXP = 32; // Replicas.size() <= 2**MAX_REPLICAS_COUNT_EXP
+
+ struct TEvPrivate {
+ enum EEv {
+ EvReconnectReplicas = EventSpaceBegin(TEvents::ES_PRIVATE),
+ };
+
+ struct TEvReconnectReplicas :
+ public TEventLocal<TEvReconnectReplicas, EEv::EvReconnectReplicas> {
+ ui32 ReplicaIdx;
+
+ TEvReconnectReplicas(ui32 replicaIdx) : ReplicaIdx(replicaIdx) {
+ }
+ };
+ };
+
enum class EReplicaState {
Unknown,
NotAvailable,
NoInfo,
Ready,
+ Reconnect,
};
struct TReplica {
TActorId Replica;
EReplicaState State = EReplicaState::Unknown;
THashSet<TActorId> Infos;
+ bool IsScheduled = false;
+ ui32 ReconnectNumber = 0;
+ NMonotonic::TMonotonic LastReconnectAt = TMonotonic::Zero();
+ TDuration CurrentDelay = TDuration::MilliSeconds(100);
};
TVector<TReplica> Replicas;
@@ -46,6 +69,19 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
ui32 WaitForReplicasToSuccess;
+ TDuration GetReconnectDelayForReplica(TReplica& replica) {
+ auto ret = replica.CurrentDelay;
+ auto newDelay = replica.CurrentDelay;
+ newDelay *= 2;
+ if (newDelay > TDuration::Seconds(5)) {
+ newDelay = TDuration::Seconds(5);
+ }
+ newDelay *= AppData()->RandomProvider->Uniform(100, 115);
+ newDelay /= 100;
+ replica.CurrentDelay = newDelay;
+ return ret;
+ }
+
struct {
ui32 Replied = 0;
ui32 NoInfo = 0;
@@ -121,8 +157,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
for (auto idx : xrange(msg->Replicas.size())) {
const TActorId &replica = msg->Replicas[idx];
Send(replica,
- new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), Subscriber),
- IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx);
+ new TEvStateStorage::TEvReplicaBoardLookup(Path, Subscriber),
+ IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
+ EncodeCookie(idx, 0));
Replicas[idx].Replica = replica;
Replicas[idx].State = EReplicaState::Unknown;
}
@@ -149,18 +186,20 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
void Handle(TEvStateStorage::TEvReplicaBoardInfoUpdate::TPtr &ev) {
- const auto &record = ev->Get()->Record;
- const ui32 idx = ev->Cookie;
- if (idx >= Replicas.size()) {
+ const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie);
+
+ if (idx >= Replicas.size())
return;
- }
auto &replica = Replicas[idx];
- if (replica.State == EReplicaState::NotAvailable) {
+
+ if (reconnectNumber != replica.ReconnectNumber) {
+ return;
+ }
+ if (replica.State != EReplicaState::Ready && replica.State != EReplicaState::NoInfo) {
return;
}
- replica.State = EReplicaState::Ready;
-
+ const auto &record = ev->Get()->Record;
auto& info = record.GetInfo();
const TActorId oid = ActorIdFromProto(info.GetOwner());
@@ -174,26 +213,27 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
if (CurrentStateFunc() == &TThis::StateSubscribe) {
- TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate update;
- update.Owner = oid;
+ std::optional<TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> update;
if (info.GetDropped()) {
if (!replicas.empty()) {
return;
}
InfoReplicas.erase(oid);
Info.erase(oid);
- update.Dropped = true;
+ update = { "", true };
} else {
- if (Info[oid].Payload != info.GetPayload()) {
- Info[oid].Payload = info.GetPayload();
- update.Payload = std::move(info.GetPayload());
+ auto& currentInfo = Info[oid];
+ if (currentInfo.Payload != info.GetPayload()) {
+ currentInfo.Payload = info.GetPayload();
+ update = { info.GetPayload(), false };
}
}
-
- auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
- TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
- reply->Update = std::move(update);
- Send(Owner, std::move(reply));
+ if (update.has_value()) {
+ auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ reply->Updates = { { oid, std::move(update.value()) } };
+ Send(Owner, std::move(reply));
+ }
} else {
if (info.GetDropped()) {
if (!replicas.empty()) {
@@ -202,20 +242,34 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
InfoReplicas.erase(oid);
Info.erase(oid);
} else {
- Info[oid].Payload = std::move(info.GetPayload());
+ Info[oid].Payload = info.GetPayload();
}
}
}
void Handle(TEvStateStorage::TEvReplicaBoardInfo::TPtr &ev) {
- const auto &record = ev->Get()->Record;
- const ui32 idx = ev->Cookie;
+ const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie);
+
if (idx >= Replicas.size())
return;
auto &replica = Replicas[idx];
- if (replica.State != EReplicaState::Unknown)
+
+ if (reconnectNumber != replica.ReconnectNumber) {
+ return;
+ }
+ if (replica.State != EReplicaState::Unknown && replica.State != EReplicaState::Reconnect) {
return;
- ++Stats.Replied;
+ }
+
+ const auto &record = ev->Get()->Record;
+
+ if (replica.State == EReplicaState::Unknown) {
+ Stats.Replied++;
+ } else {
+ Y_VERIFY(Stats.NotAvailable);
+ Stats.NotAvailable--;
+ }
+
if (record.GetDropped()) {
replica.State = EReplicaState::NoInfo;
++Stats.NoInfo;
@@ -224,12 +278,30 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
replica.State = EReplicaState::Ready;
++Stats.HasInfo;
- for (auto &x : record.GetInfo()) {
+ bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe);
+ TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates;
+
+ for (const auto &x : record.GetInfo()) {
const TActorId oid = ActorIdFromProto(x.GetOwner());
- Info[oid].Payload = std::move(x.GetPayload());
+
+ auto& currentInfo = Info[oid];
+ if (currentInfo.Payload != x.GetPayload()) {
+ currentInfo.Payload = x.GetPayload();
+ if (isStateSubscribe) {
+ updates[oid] = {x.GetPayload(), false};
+ }
+ }
+
InfoReplicas[oid].insert(idx);
replica.Infos.insert(oid);
}
+
+ if (isStateSubscribe && !updates.empty()) {
+ auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ reply->Updates = std::move(updates);
+ Send(Owner, std::move(reply));
+ }
}
CheckCompletion();
@@ -244,13 +316,23 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
++Stats.Replied;
}
if (replica.State != EReplicaState::NotAvailable) {
+ if (replica.State == EReplicaState::Ready) {
+ Y_VERIFY(Stats.HasInfo);
+ Stats.HasInfo--;
+ } else if (replica.State == EReplicaState::NoInfo) {
+ Y_VERIFY(Stats.NoInfo);
+ Stats.NoInfo--;
+ }
+ if (replica.State != EReplicaState::Reconnect) {
+ ++Stats.NotAvailable;
+ }
replica.State = EReplicaState::NotAvailable;
- ++Stats.NotAvailable;
}
- for (auto infoId : replica.Infos) {
- InfoReplicas[infoId].erase(idx);
- }
+ ClearInfosByReplica(idx);
+ replica.Infos.clear();
+
+ ReconnectReplica(idx);
}
}
@@ -258,24 +340,34 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
}
void Handle(TEvStateStorage::TEvReplicaShutdown::TPtr &ev) {
- const ui32 idx = ev->Cookie;
+ const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie);
+
if (idx >= Replicas.size())
return;
auto &replica = Replicas[idx];
- Y_VERIFY(replica.Replica == ev->Sender);
- if (replica.State == EReplicaState::Unknown) {
- ++Stats.Replied;
+ if (reconnectNumber != replica.ReconnectNumber) {
+ return;
}
- if (replica.State != EReplicaState::NotAvailable) {
- replica.State = EReplicaState::NotAvailable;
- ++Stats.NotAvailable;
+
+ if (replica.State != EReplicaState::Ready && replica.State != EReplicaState::NoInfo) {
+ return;
}
- for (auto infoId : replica.Infos) {
- InfoReplicas[infoId].erase(idx);
+ if (replica.State == EReplicaState::Ready) {
+ Y_VERIFY(Stats.HasInfo);
+ --Stats.HasInfo;
+ } else if (replica.State == EReplicaState::NoInfo) {
+ Y_VERIFY(Stats.NoInfo);
+ --Stats.NoInfo;
}
+ replica.State = EReplicaState::NotAvailable;
+ ++Stats.NotAvailable;
+
+ ClearInfosByReplica(idx);
+ replica.Infos.clear();
+
CheckCompletion();
}
@@ -283,22 +375,103 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto *msg = ev->Get();
if (msg->SourceType != TEvStateStorage::TEvReplicaBoardLookup::EventType)
return;
- const ui32 idx = ev->Cookie;
+
+ const auto [idx, reconnectNumber] = DecodeCookie(ev->Cookie);
+
if (idx >= Replicas.size())
return;
auto &replica = Replicas[idx];
- if (replica.State != EReplicaState::Unknown)
+
+ if (reconnectNumber != replica.ReconnectNumber) {
return;
- replica.State = EReplicaState::NotAvailable;
- ++Stats.Replied;
- ++Stats.NotAvailable;
+ }
+ if (replica.State != EReplicaState::Reconnect && replica.State != EReplicaState::Unknown) {
+ return;
+ }
- for (auto infoId : replica.Infos) {
- InfoReplicas[infoId].erase(idx);
+ if (replica.State == EReplicaState::Unknown) {
+ ++Stats.Replied;
+ ++Stats.NotAvailable;
}
+ replica.State = EReplicaState::NotAvailable;
+
+ ClearInfosByReplica(idx);
+ replica.Infos.clear();
+
CheckCompletion();
}
+
+ void ReconnectReplica(ui32 replicaIdx) {
+ auto& replica = Replicas[replicaIdx];
+
+ if (!Subscriber) {
+ return;
+ }
+ if (replica.IsScheduled || replica.State != EReplicaState::NotAvailable) {
+ return;
+ }
+
+ auto now = TlsActivationContext->Monotonic();
+ if (now - replica.LastReconnectAt < replica.CurrentDelay) {
+ auto at = replica.LastReconnectAt + GetReconnectDelayForReplica(replica);
+ replica.IsScheduled = true;
+ Schedule(at - now, new TEvPrivate::TEvReconnectReplicas(replicaIdx));
+ return;
+ }
+
+ replica.ReconnectNumber++;
+ replica.State = EReplicaState::Reconnect;
+ Send(replica.Replica,
+ new TEvStateStorage::TEvReplicaBoardLookup(Path, Subscriber),
+ IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession,
+ EncodeCookie(replicaIdx, replica.ReconnectNumber));
+
+ replica.LastReconnectAt = now;
+ }
+
+ void Handle(TEvPrivate::TEvReconnectReplicas::TPtr& ev) {
+ const auto& idx = ev->Get()->ReplicaIdx;
+ Replicas[idx].IsScheduled = false;
+ ReconnectReplica(idx);
+ }
+
+ std::pair<ui64, ui64> DecodeCookie(ui64 cookie) {
+ return {((1ULL << MAX_REPLICAS_COUNT_EXP) - 1) & cookie, cookie >> MAX_REPLICAS_COUNT_EXP};
+ }
+
+ ui64 EncodeCookie(ui64 idx, ui64 reconnectNumber) {
+ return idx | (reconnectNumber << MAX_REPLICAS_COUNT_EXP);
+ }
+
+ void ClearInfosByReplica(ui32 replicaIdx) {
+ bool isStateSubscribe = (CurrentStateFunc() == &TThis::StateSubscribe);
+ TMap<TActorId, TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate> updates;
+
+ const auto& replica = Replicas[replicaIdx];
+ for (auto infoId : replica.Infos) {
+ auto infoReplicasIt = InfoReplicas.find(infoId);
+ if (infoReplicasIt == InfoReplicas.end()) {
+ continue;
+ }
+ infoReplicasIt->second.erase(replicaIdx);
+ if (infoReplicasIt->second.empty()) {
+ if (isStateSubscribe) {
+ auto& update = updates[infoId];
+ update.Dropped = true;
+ }
+ InfoReplicas.erase(infoId);
+ Info.erase(infoId);
+ }
+ }
+ if (isStateSubscribe && !updates.empty()) {
+ auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ reply->Updates = std::move(updates);
+ Send(Owner, std::move(reply));
+ }
+ }
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BOARD_LOOKUP_ACTOR;
@@ -333,16 +506,19 @@ public:
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvStateStorage::TEvReplicaShutdown, Handle);
+ hFunc(TEvPrivate::TEvReconnectReplicas, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
STATEFN(StateSubscribe) {
switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvReplicaBoardInfo, Handle);
hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
hFunc(TEvStateStorage::TEvReplicaShutdown, Handle);
+ hFunc(TEvPrivate::TEvReconnectReplicas, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
diff --git a/ydb/core/base/board_replica.cpp b/ydb/core/base/board_replica.cpp
index c82e1debd8..a01260debc 100644
--- a/ydb/core/base/board_replica.cpp
+++ b/ydb/core/base/board_replica.cpp
@@ -31,7 +31,7 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
};
struct TPathSubscribeData {
- THashMap<TActorId, ui32> Subscribers; // Subcriber -> Cookie
+ THashMap<TActorId, ui64> Subscribers; // Subcriber -> Cookie
};
struct TSubscriber {
@@ -253,6 +253,7 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
auto &record = ev->Get()->Record;
const auto &path = record.GetPath();
+ ui32 flags = 0;
if (record.GetSubscribe()) {
auto& pathSubscribeData = PathToSubscribers[path];
pathSubscribeData.Subscribers[ev->Sender] = ev->Cookie;
@@ -268,37 +269,28 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
sessionsIt->second.Subscribers.insert(ev->Sender);
}
}
- }
-
- ui32 flags = 0;
- if (record.GetSubscribe()) {
flags = IEventHandle::FlagTrackDelivery;
}
+ std::unique_ptr<TEvStateStorage::TEvReplicaBoardInfo> reply;
+
auto pathIt = IndexPath.find(path);
if (pathIt == IndexPath.end()) {
- auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true);
- auto resp = std::make_unique<IEventHandle>(
- ev->Sender, ev->Recipient, reply.release(), flags, ev->Cookie);
- if (ev->InterconnectSession) {
- resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true);
+ } else {
+ reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, false);
+ auto *info = reply->Record.MutableInfo();
+ info->Reserve(pathIt->second.size());
+ for (ui32 entryIndex : pathIt->second) {
+ const TEntry &entry = Entries[entryIndex];
+ auto *ex = info->Add();
+ ActorIdToProto(entry.Owner, ex->MutableOwner());
+ ex->SetPayload(entry.Payload);
}
- TActivationContext::Send(resp.release());
- return;
- }
-
- auto reply = MakeHolder<TEvStateStorage::TEvReplicaBoardInfo>(path, false);
- auto *info = reply->Record.MutableInfo();
- info->Reserve(pathIt->second.size());
- for (ui32 entryIndex : pathIt->second) {
- const TEntry &entry = Entries[entryIndex];
- auto *ex = info->Add();
- ActorIdToProto(entry.Owner, ex->MutableOwner());
- ex->SetPayload(entry.Payload);
}
auto resp = std::make_unique<IEventHandle>(
- ev->Sender, SelfId(), reply.Release(), flags, ev->Cookie);
+ ev->Sender, SelfId(), reply.release(), flags, ev->Cookie);
if (ev->InterconnectSession) {
resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
}
diff --git a/ydb/core/base/board_subscriber_ut.cpp b/ydb/core/base/board_subscriber_ut.cpp
index 82432dde09..404432b979 100644
--- a/ydb/core/base/board_subscriber_ut.cpp
+++ b/ydb/core/base/board_subscriber_ut.cpp
@@ -60,13 +60,23 @@ class TBoardSubscriberTest: public NUnitTest::TTestBase {
Context->DispatchEvents(options);
}
+ TVector<TActorId> ResolveReplicas() {
+ const TActorId proxy = MakeStateStorageProxyID(0);
+ const TActorId edge = Context->AllocateEdgeActor();
+
+ Context->Send(proxy, edge, new TEvStateStorage::TEvResolveBoard("path"));
+ auto ev = Context->GrabEdgeEvent<TEvStateStorage::TEvResolveReplicasList>(edge);
+
+ auto allReplicas = ev->Get()->Replicas;
+ return TVector<TActorId>(allReplicas.begin(), allReplicas.end());
+ }
+
+
public:
void SetUp() override {
- Context = MakeHolder<TTestBasicRuntime>(2);
+ Context = MakeHolder<TTestBasicRuntime>(3);
- for (ui32 i : xrange(Context->GetNodeCount())) {
- SetupStateStorage(*Context, i, 0);
- }
+ SetupCustomStateStorage(*Context, 3, 3, 1, 0);
Context->Initialize(TAppPrepare().Unwrap());
}
@@ -78,12 +88,16 @@ public:
UNIT_TEST_SUITE(TBoardSubscriberTest);
UNIT_TEST(SimpleSubscriber);
UNIT_TEST(ManySubscribersManyPublisher);
- UNIT_TEST(DisconnectReplica);
+ UNIT_TEST(NotAvailableByShutdown);
+ UNIT_TEST(ReconnectReplica);
+ UNIT_TEST(DropByDisconnect);
UNIT_TEST_SUITE_END();
void SimpleSubscriber();
void ManySubscribersManyPublisher();
- void DisconnectReplica();
+ void NotAvailableByShutdown();
+ void ReconnectReplica();
+ void DropByDisconnect();
private:
THolder<TTestBasicRuntime> Context;
@@ -109,20 +123,26 @@ void TBoardSubscriberTest::SimpleSubscriber() {
auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
- const auto &update = event->Get()->Update;
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
- UNIT_ASSERT(!update.Dropped);
- UNIT_ASSERT_EQUAL(update.Owner, publisher);
- UNIT_ASSERT_EQUAL(update.Payload, "test");
+ UNIT_ASSERT(!updatesIt->second.Dropped);
+ UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test");
}
KillPublisher(edgePublisher, publisher, 1);
{
auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>(
- {edgeSubscriber}, [edgeSubscriber](const auto& ev){
- const auto &update = ev->Get()->Update;
- if (ev->Recipient == edgeSubscriber && update.Dropped) {
+ {edgeSubscriber}, [edgeSubscriber, publisher](const auto& ev){
+ const auto &updates = ev->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+ if (ev->Recipient == edgeSubscriber && updatesIt->second.Dropped) {
return true;
}
return false;
@@ -130,10 +150,11 @@ void TBoardSubscriberTest::SimpleSubscriber() {
UNIT_ASSERT(event);
UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
- const auto &update = event->Get()->Update;
+ const auto &updates = event->Get()->Updates;
- UNIT_ASSERT(update.Dropped);
- UNIT_ASSERT_EQUAL(update.Owner, publisher);
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+ UNIT_ASSERT(updatesIt->second.Dropped);
}
}
@@ -144,8 +165,8 @@ void TBoardSubscriberTest::ManySubscribersManyPublisher() {
THashSet<TActorId> subscribers;
for (size_t i = 0; i < subscribersCount; i++) {
- const auto edgeSubscriber = Context->AllocateEdgeActor(i % 2);
- CreateSubscriber("path", edgeSubscriber, i % 2);
+ const auto edgeSubscriber = Context->AllocateEdgeActor(i % 3);
+ CreateSubscriber("path", edgeSubscriber, i % 3);
subscribers.insert(edgeSubscriber);
{
auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
@@ -156,41 +177,162 @@ void TBoardSubscriberTest::ManySubscribersManyPublisher() {
}
for (size_t i = 0; i < publishersCount; i++) {
- const auto edgePublisher = Context->AllocateEdgeActor(i % 2);
- const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 2);
+ const auto edgePublisher = Context->AllocateEdgeActor(i % 3);
+ const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 3);
for (const auto& subscriber : subscribers) {
auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>(
{subscriber}, [publisher](const auto& ev) mutable {
- if (ev->Get()->Update.Owner == publisher) {
+ const auto &updates = ev->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+ auto updatesIt = updates.find(publisher);
+ if (updatesIt != updates.end()) {
return true;
}
return false;
});
UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
- UNIT_ASSERT_EQUAL(event->Get()->Update.Payload, ToString(i));
+
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+
+ UNIT_ASSERT_EQUAL(updatesIt->second.Payload, ToString(i));
}
}
}
-void TBoardSubscriberTest::DisconnectReplica() {
+void TBoardSubscriberTest::NotAvailableByShutdown() {
- std::vector<TActorId> replicas;
+ auto replicas = ResolveReplicas();
+
+ const auto edgeSubscriber = Context->AllocateEdgeActor(1);
+ CreateSubscriber("path", edgeSubscriber, 1);
{
- const auto edgeSubscriber = Context->AllocateEdgeActor(1);
- CreateSubscriber("path", edgeSubscriber, 1);
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
- {
- auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
- UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
- UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
- UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ Send(replicas[0], TActorId(), new TEvents::TEvPoisonPill(), 0, true);
+ Send(replicas[1], TActorId(), new TEvents::TEvPoisonPill(), 0, true);
+
+
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable);
+}
+
+void TBoardSubscriberTest::ReconnectReplica() {
+
+ const auto edgeSubscriber = Context->AllocateEdgeActor(1);
+ CreateSubscriber("path", edgeSubscriber, 1);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
+
+ Disconnect(1, 0);
+
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvStateStorage::EvReplicaBoardInfo);
+ Context->DispatchEvents(options);
+
+ Disconnect(1, 2);
+
+ const auto edgePublisher = Context->AllocateEdgeActor(0);
+ const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 0);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+
+ UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test");
+ }
+}
+
+void TBoardSubscriberTest::DropByDisconnect() {
+ auto replicas = ResolveReplicas();
+
+ const auto edgeSubscriber = Context->AllocateEdgeActor(1);
+ CreateSubscriber("path", edgeSubscriber, 1);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
+
+ auto prevObserverFunc = Context->SetObserverFunc([&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) {
+ switch (ev->GetTypeRewrite()) {
+ case TEvStateStorage::TEvReplicaBoardPublish::EventType: {
+ if (ev->Recipient != replicas[0]) {
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ break;
+ }
}
+ return TTestActorRuntime::EEventAction::PROCESS;
+ });
+
+ const auto edgePublisher = Context->AllocateEdgeActor(0);
+ const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 0);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+
+ UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test");
+ }
+
+ Disconnect(1, 0);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+
+ UNIT_ASSERT(updatesIt->second.Dropped);
+ }
- Disconnect(1, 0);
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvStateStorage::EvReplicaBoardInfo);
+ Context->DispatchEvents(options);
+ {
auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
- UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &updates = event->Get()->Updates;
+ UNIT_ASSERT(updates.size() == 1);
+
+ auto updatesIt = updates.find(publisher);
+ UNIT_ASSERT(updatesIt != updates.end());
+
+ UNIT_ASSERT_EQUAL(updatesIt->second.Payload, "test");
}
}
diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h
index f3a7c91608..b7ac8c124a 100644
--- a/ydb/core/base/statestorage.h
+++ b/ydb/core/base/statestorage.h
@@ -459,25 +459,18 @@ struct TEvStateStorage {
struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> {
struct TInfoEntryUpdate {
- TActorId Owner;
TString Payload;
bool Dropped = false;
};
const TEvBoardInfo::EStatus Status;
const TString Path;
- TInfoEntryUpdate Update;
+ TMap<TActorId,TInfoEntryUpdate> Updates;
TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path)
: Status(status)
, Path(path)
{}
-
- TEvBoardInfoUpdate(const TEvBoardInfoUpdate &x)
- : Status(x.Status)
- , Path(x.Path)
- , Update(x.Update)
- {}
};
};
diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h
index 669518eea9..639108fc1e 100644
--- a/ydb/core/base/statestorage_impl.h
+++ b/ydb/core/base/statestorage_impl.h
@@ -292,10 +292,9 @@ struct TEvStateStorage::TEvReplicaBoardLookup : public TEventPB<TEvStateStorage:
TEvReplicaBoardLookup()
{}
- TEvReplicaBoardLookup(const TString &path, TActorId owner, bool sub)
+ TEvReplicaBoardLookup(const TString &path, bool sub)
{
Record.SetPath(path);
- ActorIdToProto(owner, Record.MutableOwner());
Record.SetSubscribe(sub);
}
};
diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto
index d746ca2917..7bd4a86ebe 100644
--- a/ydb/core/protos/statestorage.proto
+++ b/ydb/core/protos/statestorage.proto
@@ -96,7 +96,7 @@ message TEvReplicaBoardPublishAck {
message TEvReplicaBoardLookup {
optional string Path = 1;
- optional NActorsProto.TActorId Owner = 2;
+ reserved 2;
optional bool Subscribe = 3;
};