aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorshumkovnd <shumkovnd@yandex-team.com>2023-05-23 20:17:49 +0300
committershumkovnd <shumkovnd@yandex-team.com>2023-05-23 20:17:49 +0300
commitcf214e2939c005ecb728689e108180a752bacbc9 (patch)
treef2b0c8e8e749100b22841593e49586821e6554c6
parent78300d3d4b282c3c948e3ad181ec183ce32adcc5 (diff)
downloadydb-cf214e2939c005ecb728689e108180a752bacbc9.tar.gz
board lookup subscribers
-rw-r--r--ydb/core/base/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/base/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/base/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/base/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/base/board_lookup.cpp191
-rw-r--r--ydb/core/base/board_replica.cpp310
-rw-r--r--ydb/core/base/board_subscriber_ut.cpp197
-rw-r--r--ydb/core/base/statestorage.h32
-rw-r--r--ydb/core/base/statestorage_impl.h17
-rw-r--r--ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt82
-rw-r--r--ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt85
-rw-r--r--ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt87
-rw-r--r--ydb/core/base/ut_board_subscriber/CMakeLists.txt17
-rw-r--r--ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt75
-rw-r--r--ydb/core/discovery/discovery.cpp2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp2
-rw-r--r--ydb/core/kqp/rm_service/kqp_resource_tracker.cpp3
-rw-r--r--ydb/core/load_test/service_actor.cpp4
-rw-r--r--ydb/core/mind/tenant_node_enumeration.cpp2
-rw-r--r--ydb/core/protos/statestorage.proto11
-rw-r--r--ydb/core/viewer/json_pipe_req.h4
21 files changed, 1047 insertions, 78 deletions
diff --git a/ydb/core/base/CMakeLists.darwin-x86_64.txt b/ydb/core/base/CMakeLists.darwin-x86_64.txt
index bef496eecc..dc2eb3c54b 100644
--- a/ydb/core/base/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/base/CMakeLists.darwin-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(services)
add_subdirectory(ut)
+add_subdirectory(ut_board_subscriber)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/base/CMakeLists.linux-aarch64.txt b/ydb/core/base/CMakeLists.linux-aarch64.txt
index c66927f294..1ea9e10a1b 100644
--- a/ydb/core/base/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/base/CMakeLists.linux-aarch64.txt
@@ -8,6 +8,7 @@
add_subdirectory(services)
add_subdirectory(ut)
+add_subdirectory(ut_board_subscriber)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/base/CMakeLists.linux-x86_64.txt b/ydb/core/base/CMakeLists.linux-x86_64.txt
index c66927f294..1ea9e10a1b 100644
--- a/ydb/core/base/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/base/CMakeLists.linux-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(services)
add_subdirectory(ut)
+add_subdirectory(ut_board_subscriber)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/base/CMakeLists.windows-x86_64.txt b/ydb/core/base/CMakeLists.windows-x86_64.txt
index bef496eecc..dc2eb3c54b 100644
--- a/ydb/core/base/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/base/CMakeLists.windows-x86_64.txt
@@ -8,6 +8,7 @@
add_subdirectory(services)
add_subdirectory(ut)
+add_subdirectory(ut_board_subscriber)
get_built_tool_path(
TOOL_enum_parser_bin
TOOL_enum_parser_dependency
diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp
index 890b88f6b9..28959ab861 100644
--- a/ydb/core/base/board_lookup.cpp
+++ b/ydb/core/base/board_lookup.cpp
@@ -24,6 +24,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
const TActorId Owner;
const EBoardLookupMode Mode;
const ui32 StateStorageGroupId;
+ const bool Subscriber;
enum class EReplicaState {
Unknown,
@@ -35,10 +36,13 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
struct TReplica {
TActorId Replica;
EReplicaState State = EReplicaState::Unknown;
+ THashSet<TActorId> Infos;
};
TVector<TReplica> Replicas;
+
TMap<TActorId, TEvStateStorage::TEvBoardInfo::TInfoEntry> Info;
+ THashMap<TActorId, THashSet<ui32>> InfoReplicas;
ui32 WaitForReplicasToSuccess;
@@ -46,31 +50,63 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
ui32 Replied = 0;
ui32 NoInfo = 0;
ui32 HasInfo = 0;
+ ui32 NotAvailable = 0;
} Stats;
void PassAway() override {
- for (const auto &replica : Replicas)
- if (replica.Replica.NodeId() != SelfId().NodeId())
+ for (const auto &replica : Replicas) {
+ if (Subscriber) {
+ Send(replica.Replica, new TEvStateStorage::TEvReplicaBoardUnsubscribe());
+ }
+ if (replica.Replica.NodeId() != SelfId().NodeId()) {
Send(TActivationContext::InterconnectProxy(replica.Replica.NodeId()), new TEvents::TEvUnsubscribe());
+ }
+ }
TActor::PassAway();
}
void NotAvailable() {
- Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path));
+ if (CurrentStateFunc() != &TThis::StateSubscribe) {
+ Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path));
+ } else {
+ Send(Owner,
+ new TEvStateStorage::TEvBoardInfoUpdate(
+ TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path
+ )
+ );
+ }
return PassAway();
}
void CheckCompletion() {
- if (Stats.HasInfo == WaitForReplicasToSuccess) {
- auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
- reply->InfoEntries = std::move(Info);
- Send(Owner, std::move(reply));
+ if (CurrentStateFunc() != &TThis::StateSubscribe) {
+ if ((!Subscriber && Stats.HasInfo == WaitForReplicasToSuccess) ||
+ (Subscriber && Stats.HasInfo + Stats.NoInfo == WaitForReplicasToSuccess)) {
+ auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ reply->InfoEntries = std::move(Info);
+ Send(Owner, std::move(reply));
+ if (Subscriber) {
+ Become(&TThis::StateSubscribe);
+ return;
+ }
+ return PassAway();
+ }
- return PassAway();
+ if (!Subscriber) {
+ if (Stats.Replied == Replicas.size()) {
+ return NotAvailable();
+ }
+ } else {
+ if (Stats.NotAvailable > (Replicas.size() - WaitForReplicasToSuccess)) {
+ return NotAvailable();
+ }
+ }
+ } else {
+ if (Stats.NotAvailable > (Replicas.size() - WaitForReplicasToSuccess)) {
+ return NotAvailable();
+ }
}
-
- if (Stats.Replied == Replicas.size())
- return NotAvailable();
}
void Handle(TEvStateStorage::TEvResolveReplicasList::TPtr &ev) {
@@ -84,7 +120,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
Replicas.resize(msg->Replicas.size());
for (auto idx : xrange(msg->Replicas.size())) {
const TActorId &replica = msg->Replicas[idx];
- Send(replica, new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), false), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx);
+ Send(replica,
+ new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), Subscriber),
+ IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx);
Replicas[idx].Replica = replica;
Replicas[idx].State = EReplicaState::Unknown;
}
@@ -100,6 +138,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
break;
case EBoardLookupMode::Majority:
case EBoardLookupMode::MajorityDoubleTime:
+ case EBoardLookupMode::Subscription:
WaitForReplicasToSuccess = (Replicas.size() / 2 + 1);
break;
default:
@@ -109,6 +148,65 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
Become(&TThis::StateLookup);
}
+ void Handle(TEvStateStorage::TEvReplicaBoardInfoUpdate::TPtr &ev) {
+ const auto &record = ev->Get()->Record;
+ const ui32 idx = ev->Cookie;
+ if (idx >= Replicas.size()) {
+ return;
+ }
+ auto &replica = Replicas[idx];
+ if (replica.State == EReplicaState::NotAvailable) {
+ return;
+ }
+
+ replica.State = EReplicaState::Ready;
+
+ auto& info = record.GetInfo();
+ const TActorId oid = ActorIdFromProto(info.GetOwner());
+
+ auto& replicas = InfoReplicas[oid];
+ if (info.GetDropped()) {
+ replicas.erase(idx);
+ replica.Infos.erase(oid);
+ } else {
+ replicas.insert(idx);
+ replica.Infos.insert(oid);
+ }
+
+ if (CurrentStateFunc() == &TThis::StateSubscribe) {
+ TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate update;
+ update.Owner = oid;
+ if (info.GetDropped()) {
+ if (!replicas.empty()) {
+ return;
+ }
+ InfoReplicas.erase(oid);
+ Info.erase(oid);
+ update.Dropped = true;
+ } else {
+ if (Info[oid].Payload != info.GetPayload()) {
+ Info[oid].Payload = info.GetPayload();
+ update.Payload = std::move(info.GetPayload());
+ }
+ }
+
+ auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>(
+ TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path);
+ reply->Update = std::move(update);
+ Send(Owner, std::move(reply));
+ } else {
+ if (info.GetDropped()) {
+ if (!replicas.empty()) {
+ return;
+ }
+ InfoReplicas.erase(oid);
+ Info.erase(oid);
+ } else {
+ Info[oid].Payload = std::move(info.GetPayload());
+ }
+ }
+ }
+
void Handle(TEvStateStorage::TEvReplicaBoardInfo::TPtr &ev) {
const auto &record = ev->Get()->Record;
const ui32 idx = ev->Cookie;
@@ -117,7 +215,6 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
auto &replica = Replicas[idx];
if (replica.State != EReplicaState::Unknown)
return;
-
++Stats.Replied;
if (record.GetDropped()) {
replica.State = EReplicaState::NoInfo;
@@ -129,7 +226,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
for (auto &x : record.GetInfo()) {
const TActorId oid = ActorIdFromProto(x.GetOwner());
- Info[oid].Payload = x.GetPayload();
+ Info[oid].Payload = std::move(x.GetPayload());
+ InfoReplicas[oid].insert(idx);
+ replica.Infos.insert(oid);
}
}
@@ -138,17 +237,48 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
const ui32 nodeId = ev->Get()->NodeId;
- for (auto &replica : Replicas) {
- if (replica.Replica.NodeId() == nodeId && replica.State == EReplicaState::Unknown) {
- replica.State = EReplicaState::NotAvailable;
- ++Stats.Replied;
- ++Stats.NoInfo;
+ for (ui32 idx = 0; idx < Replicas.size(); idx++) {
+ auto& replica = Replicas[idx];
+ if (replica.Replica.NodeId() == nodeId) {
+ if (replica.State == EReplicaState::Unknown) {
+ ++Stats.Replied;
+ }
+ if (replica.State != EReplicaState::NotAvailable) {
+ replica.State = EReplicaState::NotAvailable;
+ ++Stats.NotAvailable;
+ }
+
+ for (auto infoId : replica.Infos) {
+ InfoReplicas[infoId].erase(idx);
+ }
}
}
CheckCompletion();
}
+ void Handle(TEvStateStorage::TEvReplicaShutdown::TPtr &ev) {
+ const ui32 idx = ev->Cookie;
+ if (idx >= Replicas.size())
+ return;
+ auto &replica = Replicas[idx];
+ Y_VERIFY(replica.Replica == ev->Sender);
+
+ if (replica.State == EReplicaState::Unknown) {
+ ++Stats.Replied;
+ }
+ if (replica.State != EReplicaState::NotAvailable) {
+ replica.State = EReplicaState::NotAvailable;
+ ++Stats.NotAvailable;
+ }
+
+ for (auto infoId : replica.Infos) {
+ InfoReplicas[infoId].erase(idx);
+ }
+
+ CheckCompletion();
+ }
+
void Handle(TEvents::TEvUndelivered::TPtr &ev) {
auto *msg = ev->Get();
if (msg->SourceType != TEvStateStorage::TEvReplicaBoardLookup::EventType)
@@ -161,7 +291,11 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> {
return;
replica.State = EReplicaState::NotAvailable;
++Stats.Replied;
- ++Stats.NoInfo;
+ ++Stats.NotAvailable;
+
+ for (auto infoId : replica.Infos) {
+ InfoReplicas[infoId].erase(idx);
+ }
CheckCompletion();
}
@@ -175,6 +309,7 @@ public:
, Owner(owner)
, Mode(mode)
, StateStorageGroupId(groupId)
+ , Subscriber(Mode == EBoardLookupMode::Subscription)
{}
void Bootstrap() {
@@ -194,16 +329,26 @@ public:
STATEFN(StateLookup) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvStateStorage::TEvReplicaBoardInfo, Handle);
+ hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvStateStorage::TEvReplicaShutdown, Handle);
+ cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
+ }
+ }
+
+ STATEFN(StateSubscribe) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle);
hFunc(TEvents::TEvUndelivered, Handle);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvStateStorage::TEvReplicaShutdown, Handle);
cFunc(TEvents::TEvPoisonPill::EventType, PassAway);
}
}
};
-IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, bool sub, bool useNodeSubsriptions) {
- Y_UNUSED(useNodeSubsriptions);
- Y_VERIFY(!sub, "subscribe mode for board lookup not implemented yet");
+IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode) {
return new TBoardLookupActor(path, owner, mode, groupId);
}
diff --git a/ydb/core/base/board_replica.cpp b/ydb/core/base/board_replica.cpp
index c38fd20e35..c82e1debd8 100644
--- a/ydb/core/base/board_replica.cpp
+++ b/ydb/core/base/board_replica.cpp
@@ -18,6 +18,7 @@
namespace NKikimr {
class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
+
using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>;
using TPathIndex = TMap<TString, TSet<ui32>>;
@@ -26,11 +27,35 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
TActorId Owner;
TOwnerIndex::iterator OwnerIt;
TPathIndex::iterator PathIt;
+ TActorId Session = TActorId();
+ };
+
+ struct TPathSubscribeData {
+ THashMap<TActorId, ui32> Subscribers; // Subcriber -> Cookie
+ };
+
+ struct TSubscriber {
+ TString Path;
+ TActorId Session = TActorId();
};
TVector<TEntry> Entries;
TVector<ui32> AvailableEntries;
+ THashMap<TActorId, TSubscriber> Subscribers;
+ TMap<TString, TPathSubscribeData> PathToSubscribers;
+
+ struct TSessionSubscribers {
+ THashSet<TActorId> Subscribers;
+ THashSet<TActorId> Publishers;
+
+ bool Empty() const {
+ return Subscribers.empty() && Publishers.empty();
+ }
+ };
+
+ THashMap<TActorId, TSessionSubscribers> Sessions; // InterconnectSession -> Session subscribers
+
TOwnerIndex IndexOwner;
TPathIndex IndexPath;
@@ -58,6 +83,8 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
return;
}
+ auto pathSubscribeDataIt = PathToSubscribers.find(path);
+
auto ownerIt = IndexOwner.find(owner);
if (ownerIt != IndexOwner.end()) {
const ui32 entryIndex = ownerIt->second;
@@ -68,14 +95,25 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
return;
}
+ if (ev->InterconnectSession) {
+ entry.Session = ev->InterconnectSession;
+ }
entry.Payload = record.GetPayload();
Y_VERIFY_DEBUG(entry.Owner == ActorIdFromProto(record.GetOwner()));
+
+ if (pathSubscribeDataIt != PathToSubscribers.end()) {
+ SendUpdateToSubscribers(entry, false);
+ }
+
} else {
const ui32 entryIndex = AllocateEntry();
TEntry &entry = Entries[entryIndex];
entry.Payload = record.GetPayload();
entry.Owner = ActorIdFromProto(record.GetOwner());
+ if (ev->InterconnectSession) {
+ entry.Session = ev->InterconnectSession;
+ }
auto ownerInsPairIt = IndexOwner.emplace(owner, entryIndex);
entry.OwnerIt = ownerInsPairIt.first;
@@ -83,60 +121,115 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
entry.PathIt = pathInsPairIt.first;
entry.PathIt->second.emplace(entryIndex);
- Send(owner, new TEvStateStorage::TEvReplicaBoardPublishAck, IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie);
+ if (pathSubscribeDataIt != PathToSubscribers.end()) {
+ SendUpdateToSubscribers(entry, false);
+ }
}
- }
- bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) {
- const ui32 ownerNodeId = ownerIt->first.NodeId();
- if (ownerIt != IndexOwner.begin()) {
- auto x = ownerIt;
- --x;
- if (x->first.NodeId() == ownerNodeId)
- return false;
+ if (ev->InterconnectSession) {
+ auto sessionsIt = Sessions.find(ev->InterconnectSession);
+ if (sessionsIt == Sessions.end()) {
+ Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
+ Sessions[ev->InterconnectSession].Publishers.insert(ev->Sender);
+ } else {
+ sessionsIt->second.Publishers.insert(ev->Sender);
+ }
}
- ++ownerIt;
- if (ownerIt != IndexOwner.end()) {
- if (ownerIt->first.NodeId() == ownerNodeId)
- return false;
- }
+ auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardPublishAck>();
+ auto resp = std::make_unique<IEventHandle>(
+ owner, SelfId(), reply.release(),
+ IEventHandle::FlagTrackDelivery, ev->Cookie);
- return true;
+ if (ev->InterconnectSession) {
+ resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ TActivationContext::Send(resp.release());
}
- void CleanupEntry(ui32 entryIndex) {
+ void CleanupEntry(ui32 entryIndex, TActorId session) {
TEntry &entry = Entries[entryIndex];
+
entry.PathIt->second.erase(entryIndex);
if (entry.PathIt->second.empty()) {
IndexPath.erase(entry.PathIt);
}
- if (IsLastEntryOnNode(entry.OwnerIt)) {
- Send(TActivationContext::InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe());
- }
+ CleanupSessionForPublisher(session, entry.OwnerIt->first);
+
IndexOwner.erase(entry.OwnerIt);
TString().swap(entry.Payload);
entry.Owner = TActorId();
entry.PathIt = IndexPath.end();
entry.OwnerIt = IndexOwner.end();
+ entry.Session = TActorId();
AvailableEntries.emplace_back(entryIndex);
}
- void PassAway() override {
- ui32 prevNode = 0;
- for (auto &xpair : IndexOwner) {
- const ui32 nodeId = xpair.first.NodeId();
- if (nodeId != prevNode) {
- Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe());
- prevNode = nodeId;
+ void CleanupSubscriber(const TActorId& subscriber, TActorId session) {
+ auto subscriberIt = Subscribers.find(subscriber);
+ if (subscriberIt == Subscribers.end()) {
+ return;
+ }
+
+ CleanupSessionForSubscriber(session, subscriber);
+
+ auto& pathSubscribeData = PathToSubscribers[subscriberIt->second.Path];
+ pathSubscribeData.Subscribers.erase(subscriberIt->first);
+ if (pathSubscribeData.Subscribers.empty()) {
+ PathToSubscribers.erase(subscriberIt->second.Path);
+ }
+ Subscribers.erase(subscriberIt);
+ }
+
+
+ void CleanupSessionForSubscriber(const TActorId& session, const TActorId& subscriber) {
+ if (!session) {
+ return;
+ }
+ auto sessionsIt = Sessions.find(session);
+ if (sessionsIt != Sessions.end()) {
+ sessionsIt->second.Subscribers.erase(subscriber);
+ if (sessionsIt->second.Empty()) {
+ Send(sessionsIt->first, new TEvents::TEvUnsubscribe());
+ Sessions.erase(sessionsIt);
}
+ }
+ }
+
+ void CleanupSessionForPublisher(const TActorId& session, const TActorId& publisher) {
+ if (!session) {
+ return;
+ }
+ auto sessionsIt = Sessions.find(session);
+ if (sessionsIt != Sessions.end()) {
+ sessionsIt->second.Publishers.erase(publisher);
+ if (sessionsIt->second.Empty()) {
+ Send(sessionsIt->first, new TEvents::TEvUnsubscribe());
+ Sessions.erase(sessionsIt);
+ }
+ }
+ }
+
+ void PassAway() override {
+
+ for (const auto& [session, sessionSubscribers] : Sessions) {
+ Send(session, new TEvents::TEvUnsubscribe());
+ }
+ for (const auto& xpair : IndexOwner) {
Send(xpair.first, new TEvStateStorage::TEvReplicaShutdown());
}
+ for (const auto& [path, pathSubscribeData] : PathToSubscribers) {
+ for (const auto& [subscriber, cookie] : pathSubscribeData.Subscribers) {
+ auto reply = MakeHolder<TEvStateStorage::TEvReplicaShutdown>();
+ Send(subscriber, std::move(reply), 0, cookie);
+ }
+ }
+
// all cleanup in actor destructor
TActor::PassAway();
}
@@ -146,7 +239,14 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
if (ownerIt == IndexOwner.end()) // do nothing, already removed?
return;
- CleanupEntry(ownerIt->second);
+ const auto& entry = Entries[ownerIt->second];
+ const auto& path = entry.PathIt->first;
+ auto pathSubscribeDataIt = PathToSubscribers.find(path);
+ if (pathSubscribeDataIt != PathToSubscribers.end()) {
+ SendUpdateToSubscribers(entry, true);
+ }
+
+ CleanupEntry(ownerIt->second, ev->InterconnectSession);
}
void Handle(TEvStateStorage::TEvReplicaBoardLookup::TPtr &ev) {
@@ -154,14 +254,36 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
const auto &path = record.GetPath();
if (record.GetSubscribe()) {
- BLOG_ERROR("trying to subscribe on path, must be not implemented yet");
- // reply nothing, request suspicious
- return;
+ auto& pathSubscribeData = PathToSubscribers[path];
+ pathSubscribeData.Subscribers[ev->Sender] = ev->Cookie;
+ auto& subscriber = Subscribers[ev->Sender];
+ subscriber.Path = path;
+ if (ev->InterconnectSession) {
+ subscriber.Session = ev->InterconnectSession;
+ auto sessionsIt = Sessions.find(ev->InterconnectSession);
+ if (sessionsIt == Sessions.end()) {
+ Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery);
+ Sessions[ev->InterconnectSession].Subscribers.insert(ev->Sender);
+ } else {
+ sessionsIt->second.Subscribers.insert(ev->Sender);
+ }
+ }
+ }
+
+ ui32 flags = 0;
+ if (record.GetSubscribe()) {
+ flags = IEventHandle::FlagTrackDelivery;
}
auto pathIt = IndexPath.find(path);
if (pathIt == IndexPath.end()) {
- Send(ev->Sender, new TEvStateStorage::TEvReplicaBoardInfo(path, true), 0, ev->Cookie);
+ auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true);
+ auto resp = std::make_unique<IEventHandle>(
+ ev->Sender, ev->Recipient, reply.release(), flags, ev->Cookie);
+ if (ev->InterconnectSession) {
+ resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ TActivationContext::Send(resp.release());
return;
}
@@ -175,27 +297,130 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> {
ex->SetPayload(entry.Payload);
}
- Send(ev->Sender, std::move(reply), 0, ev->Cookie);
+ auto resp = std::make_unique<IEventHandle>(
+ ev->Sender, SelfId(), reply.Release(), flags, ev->Cookie);
+ if (ev->InterconnectSession) {
+ resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession);
+ }
+ TActivationContext::Send(resp.release());
}
void Handle(TEvents::TEvUndelivered::TPtr &ev) {
- auto ownerIt = IndexOwner.find(ev->Sender);
- if (ownerIt == IndexOwner.end())
+ auto *msg = ev->Get();
+ switch (msg->SourceType) {
+ case TEvents::TEvSubscribe::EventType: {
+ DisconnectSession(ev->Sender);
+ return;
+ }
+ case TEvStateStorage::TEvReplicaBoardInfo::EventType: {
+ auto subscribersIt = Subscribers.find(ev->Sender);
+ if (subscribersIt == Subscribers.end()) {
+ return;
+ }
+ if (subscribersIt->second.Session != ev->InterconnectSession) {
+ return;
+ }
+ CleanupSubscriber(ev->Sender, ev->InterconnectSession);
+ break;
+ }
+ case TEvStateStorage::TEvReplicaBoardPublishAck::EventType: {
+ auto ownerIt = IndexOwner.find(ev->Sender);
+ if (ownerIt == IndexOwner.end())
+ return;
+
+ const auto& entry = Entries[ownerIt->second];
+ if (entry.Session != ev->InterconnectSession) {
+ return;
+ }
+ const auto& path = entry.PathIt->first;
+ auto pathSubscribeDataIt = PathToSubscribers.find(path);
+ if (pathSubscribeDataIt != PathToSubscribers.end()) {
+ SendUpdateToSubscribers(entry, true);
+ }
+
+ CleanupEntry(ownerIt->second, ev->InterconnectSession);
+ break;
+ }
+ default:
+ Y_FAIL("Unexpected case");
+ }
+ }
+
+ void SendUpdateToSubscribers(const TEntry& entry, bool dropped ) {
+ const auto& path = entry.PathIt->first;
+
+ auto pathSubscribeDataIt = PathToSubscribers.find(path);
+ if (pathSubscribeDataIt == PathToSubscribers.end()) {
return;
+ }
+
+ auto& pathSubscribeData = pathSubscribeDataIt->second;
- CleanupEntry(ownerIt->second);
+ NKikimrStateStorage::TEvReplicaBoardInfoUpdate record;
+ auto *info = record.MutableInfo();
+ ActorIdToProto(entry.Owner, info->MutableOwner());
+ if (dropped) {
+ info->SetDropped(true);
+ } else {
+ info->SetPayload(entry.Payload);
+ }
+
+ for (const auto& subscriber : pathSubscribeData.Subscribers) {
+ auto reply = MakeHolder<TEvStateStorage::TEvReplicaBoardInfoUpdate>(path);
+ reply->Record = record;
+ Send(subscriber.first, std::move(reply), 0, subscriber.second);
+ }
}
void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
- auto *msg = ev->Get();
- const ui32 nodeId = msg->NodeId;
- auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0));
- while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) {
- const ui32 entryToCleanupIndex = ownerIt->second;
- ++ownerIt;
- CleanupEntry(entryToCleanupIndex);
+ DisconnectSession(ev->Sender);
+ }
+
+ void Handle(TEvStateStorage::TEvReplicaBoardUnsubscribe::TPtr& ev) {
+ const auto& sender = ev->Sender;
+
+ CleanupSubscriber(sender, ev->InterconnectSession);
+ }
+
+ void DisconnectSession(const TActorId& session) {
+ auto sessionsIt = Sessions.find(session);
+ if (sessionsIt == Sessions.end()) {
+ return;
+ }
+
+ for (const auto& subscriber : sessionsIt->second.Subscribers) {
+ auto subscribersIt = Subscribers.find(subscriber);
+ if (subscribersIt == Subscribers.end()) {
+ continue;
+ }
+ if (subscribersIt->second.Session == session) {
+ CleanupSubscriber(subscriber, TActorId());
+ }
+ }
+
+ for (const auto& publisher : sessionsIt->second.Publishers) {
+ auto ownerIt = IndexOwner.find(publisher);
+ if (ownerIt == IndexOwner.end()) {
+ continue;
+ }
+
+ const auto& entry = Entries[ownerIt->second];
+
+ if (entry.Session != session) {
+ continue;
+ }
+ const auto& path = entry.PathIt->first;
+ auto pathSubscribeDataIt = PathToSubscribers.find(path);
+ if (pathSubscribeDataIt != PathToSubscribers.end()) {
+ SendUpdateToSubscribers(entry, true);
+ }
+
+ CleanupEntry(ownerIt->second, TActorId());
}
+
+ Sessions.erase(sessionsIt);
}
+
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::BOARD_REPLICA_ACTOR;
@@ -213,6 +438,7 @@ public:
hFunc(TEvents::TEvUndelivered, Handle);
cFunc(TEvents::TEvPoison::EventType, PassAway);
hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvStateStorage::TEvReplicaBoardUnsubscribe, Handle);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
default:
diff --git a/ydb/core/base/board_subscriber_ut.cpp b/ydb/core/base/board_subscriber_ut.cpp
new file mode 100644
index 0000000000..82432dde09
--- /dev/null
+++ b/ydb/core/base/board_subscriber_ut.cpp
@@ -0,0 +1,197 @@
+#include <ydb/core/base/statestorage.h>
+#include <ydb/core/base/statestorage_impl.h>
+
+#include <ydb/core/base/pathid.h>
+#include <ydb/core/testlib/basics/appdata.h>
+#include <ydb/core/testlib/basics/helpers.h>
+
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_impl.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <ydb/core/testlib/basics/runtime.h>
+
+#include <util/generic/vector.h>
+#include <util/generic/xrange.h>
+
+namespace NKikimr {
+
+class TBoardSubscriberTest: public NUnitTest::TTestBase {
+
+ TActorId CreateSubscriber(const TString& path, const TActorId& owner, ui32 nodeIdx) {
+ const TActorId subscriber = Context->Register(
+ CreateBoardLookupActor(path, owner, 0, EBoardLookupMode::Subscription), nodeIdx
+ );
+ Context->EnableScheduleForActor(subscriber);
+ return subscriber;
+ }
+
+ TActorId CreatePublisher(
+ const TString& path, const TString& payload, const TActorId& owner, ui32 nodeIdx) {
+ const TActorId publisher = Context->Register(
+ CreateBoardPublishActor(path, payload, owner, 0, 0, true), nodeIdx
+ );
+ Context->EnableScheduleForActor(publisher);
+ return publisher;
+ }
+
+ void Send(
+ const TActorId& recipient,
+ const TActorId& sender,
+ IEventBase* ev,
+ ui32 nodeIdx,
+ bool viaActorSystem = false) {
+ Context->Send(new IEventHandle(recipient, sender, ev, 0, 0), nodeIdx, viaActorSystem);
+ }
+
+ void KillPublisher(const TActorId& owner, const TActorId& publisher, ui32 nodeIdx) {
+ Send(publisher, owner, new TEvents::TEvPoisonPill(), nodeIdx);
+ }
+
+ void Disconnect(ui32 nodeIndexFrom, ui32 nodeIndexTo) {
+ const TActorId proxy = Context->GetInterconnectProxy(nodeIndexFrom, nodeIndexTo);
+
+ Send(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), nodeIndexFrom, true);
+
+ //Wait for event TEvInterconnect::EvNodeDisconnected
+ TDispatchOptions options;
+ options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected);
+ Context->DispatchEvents(options);
+ }
+
+public:
+ void SetUp() override {
+ Context = MakeHolder<TTestBasicRuntime>(2);
+
+ for (ui32 i : xrange(Context->GetNodeCount())) {
+ SetupStateStorage(*Context, i, 0);
+ }
+
+ Context->Initialize(TAppPrepare().Unwrap());
+ }
+
+ void TearDown() override {
+ Context.Reset();
+ }
+
+ UNIT_TEST_SUITE(TBoardSubscriberTest);
+ UNIT_TEST(SimpleSubscriber);
+ UNIT_TEST(ManySubscribersManyPublisher);
+ UNIT_TEST(DisconnectReplica);
+ UNIT_TEST_SUITE_END();
+
+ void SimpleSubscriber();
+ void ManySubscribersManyPublisher();
+ void DisconnectReplica();
+
+private:
+ THolder<TTestBasicRuntime> Context;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TBoardSubscriberTest);
+
+void TBoardSubscriberTest::SimpleSubscriber() {
+ const auto edgeSubscriber = Context->AllocateEdgeActor(0);
+ CreateSubscriber("path", edgeSubscriber, 0);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
+
+ const auto edgePublisher = Context->AllocateEdgeActor(1);
+ const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 1);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &update = event->Get()->Update;
+
+ UNIT_ASSERT(!update.Dropped);
+ UNIT_ASSERT_EQUAL(update.Owner, publisher);
+ UNIT_ASSERT_EQUAL(update.Payload, "test");
+ }
+
+ KillPublisher(edgePublisher, publisher, 1);
+
+ {
+ auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>(
+ {edgeSubscriber}, [edgeSubscriber](const auto& ev){
+ const auto &update = ev->Get()->Update;
+ if (ev->Recipient == edgeSubscriber && update.Dropped) {
+ return true;
+ }
+ return false;
+ });
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ const auto &update = event->Get()->Update;
+
+ UNIT_ASSERT(update.Dropped);
+ UNIT_ASSERT_EQUAL(update.Owner, publisher);
+ }
+}
+
+void TBoardSubscriberTest::ManySubscribersManyPublisher() {
+ size_t subscribersCount = 10;
+ size_t publishersCount = 10;
+
+ THashSet<TActorId> subscribers;
+
+ for (size_t i = 0; i < subscribersCount; i++) {
+ const auto edgeSubscriber = Context->AllocateEdgeActor(i % 2);
+ CreateSubscriber("path", edgeSubscriber, i % 2);
+ subscribers.insert(edgeSubscriber);
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
+ }
+
+ for (size_t i = 0; i < publishersCount; i++) {
+ const auto edgePublisher = Context->AllocateEdgeActor(i % 2);
+ const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 2);
+ for (const auto& subscriber : subscribers) {
+ auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>(
+ {subscriber}, [publisher](const auto& ev) mutable {
+ if (ev->Get()->Update.Owner == publisher) {
+ return true;
+ }
+ return false;
+ });
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT_EQUAL(event->Get()->Update.Payload, ToString(i));
+ }
+ }
+}
+
+void TBoardSubscriberTest::DisconnectReplica() {
+
+ std::vector<TActorId> replicas;
+
+ {
+ const auto edgeSubscriber = Context->AllocateEdgeActor(1);
+ CreateSubscriber("path", edgeSubscriber, 1);
+
+ {
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok);
+ UNIT_ASSERT_EQUAL(event->Get()->Path, "path");
+ UNIT_ASSERT(event->Get()->InfoEntries.empty());
+ }
+
+ Disconnect(1, 0);
+
+ auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber);
+ UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable);
+ }
+}
+
+} // NKikimr
diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h
index df49ba869a..f3a7c91608 100644
--- a/ydb/core/base/statestorage.h
+++ b/ydb/core/base/statestorage.h
@@ -26,6 +26,7 @@ struct TEvStateStorage {
EvListSchemeBoard, // all
EvUpdateGroupConfig,
EvListStateStorage,
+ EvBoardInfoUpdate,
// replies (local, from proxy)
EvInfo = EvLookup + 512,
@@ -54,9 +55,11 @@ struct TEvStateStorage {
EvReplicaBoardPublish = EvLock + 4 * 512,
EvReplicaBoardLookup,
EvReplicaBoardCleanup,
+ EvReplicaBoardUnsubscribe,
EvReplicaBoardPublishAck = EvLock + 5 * 512,
EvReplicaBoardInfo,
+ EvReplicaBoardInfoUpdate,
EvReplicaProbeSubscribe = EvLock + 6 * 512,
EvReplicaProbeUnsubscribe,
@@ -371,8 +374,10 @@ struct TEvStateStorage {
struct TEvReplicaBoardPublish;
struct TEvReplicaBoardLookup;
struct TEvReplicaBoardCleanup;
+ struct TEvReplicaBoardUnsubscribe;
struct TEvReplicaBoardPublishAck;
struct TEvReplicaBoardInfo;
+ struct TEvReplicaBoardInfoUpdate;
struct TEvListSchemeBoard;
struct TEvListSchemeBoardResult;
struct TEvListStateStorage;
@@ -450,6 +455,30 @@ struct TEvStateStorage {
, InfoEntries(x.InfoEntries)
{}
};
+
+ struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> {
+
+ struct TInfoEntryUpdate {
+ TActorId Owner;
+ TString Payload;
+ bool Dropped = false;
+ };
+
+ const TEvBoardInfo::EStatus Status;
+ const TString Path;
+ TInfoEntryUpdate Update;
+
+ TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path)
+ : Status(status)
+ , Path(path)
+ {}
+
+ TEvBoardInfoUpdate(const TEvBoardInfoUpdate &x)
+ : Status(x.Status)
+ , Path(x.Path)
+ , Update(x.Update)
+ {}
+ };
};
struct TStateStorageInfo : public TThrRefBase {
@@ -512,6 +541,7 @@ enum class EBoardLookupMode {
FirstNonEmptyDoubleTime,
SecondNonEmptyDoubleTime,
MajorityDoubleTime,
+ Subscription,
};
TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfo(char (&namePrefix)[TActorId::MaxServiceIDLength], const NKikimrConfig::TDomainsConfig::TStateStorage& config);
@@ -530,7 +560,7 @@ IActor* CreateStateStorageTabletGuardian(ui64 tabletId, const TActorId &leader,
IActor* CreateStateStorageFollowerGuardian(ui64 tabletId, const TActorId &follower); // created as followerCandidate
IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &, ui32);
IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32);
-IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, bool sub, bool useNodeSubscriptions);
+IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode);
IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg);
TString MakeEndpointsBoardPath(const TString &database);
diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h
index 8951dc6c4b..669518eea9 100644
--- a/ydb/core/base/statestorage_impl.h
+++ b/ydb/core/base/statestorage_impl.h
@@ -305,12 +305,18 @@ struct TEvStateStorage::TEvReplicaBoardCleanup : public TEventPB<TEvStateStorage
{}
};
+struct TEvStateStorage::TEvReplicaBoardUnsubscribe : public TEventPB<TEvStateStorage::TEvReplicaBoardUnsubscribe, NKikimrStateStorage::TEvReplicaBoardUnsubscribe, TEvStateStorage::EvReplicaBoardUnsubscribe> {
+ TEvReplicaBoardUnsubscribe()
+ {}
+};
+
struct TEvStateStorage::TEvReplicaBoardPublishAck : public TEventPB<TEvStateStorage::TEvReplicaBoardPublishAck, NKikimrStateStorage::TEvReplicaBoardPublishAck, TEvStateStorage::EvReplicaBoardPublishAck> {
TEvReplicaBoardPublishAck()
{}
};
struct TEvStateStorage::TEvReplicaBoardInfo : public TEventPB<TEvStateStorage::TEvReplicaBoardInfo, NKikimrStateStorage::TEvReplicaBoardInfo, TEvStateStorage::EvReplicaBoardInfo> {
+
TEvReplicaBoardInfo()
{}
@@ -321,6 +327,17 @@ struct TEvStateStorage::TEvReplicaBoardInfo : public TEventPB<TEvStateStorage::T
}
};
+struct TEvStateStorage::TEvReplicaBoardInfoUpdate : public TEventPB<TEvStateStorage::TEvReplicaBoardInfoUpdate, NKikimrStateStorage::TEvReplicaBoardInfoUpdate, TEvStateStorage::EvReplicaBoardInfoUpdate> {
+
+ TEvReplicaBoardInfoUpdate()
+ {}
+
+ TEvReplicaBoardInfoUpdate(const TString &path)
+ {
+ Record.SetPath(path);
+ }
+};
+
IActor* CreateStateStorageReplicaProbe(TActorId replica);
}
diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..b6884fef03
--- /dev/null
+++ b/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,82 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-base-ut_board_subscriber)
+target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base
+)
+target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-core-base
+ cpp-actors-interconnect
+ cpp-actors-core
+ cpp-testing-unittest
+ core-testlib-basics
+ testlib-basics-default
+)
+target_link_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-core-base-ut_board_subscriber
+ TEST_TARGET
+ ydb-core-base-ut_board_subscriber
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-base-ut_board_subscriber
+ system_allocator
+)
+vcs_info(ydb-core-base-ut_board_subscriber)
diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..b003f94e45
--- /dev/null
+++ b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,85 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-base-ut_board_subscriber)
+target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base
+)
+target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ cpp-testing-unittest_main
+ ydb-core-base
+ cpp-actors-interconnect
+ cpp-actors-core
+ cpp-testing-unittest
+ core-testlib-basics
+ testlib-basics-default
+)
+target_link_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-core-base-ut_board_subscriber
+ TEST_TARGET
+ ydb-core-base-ut_board_subscriber
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-base-ut_board_subscriber
+ cpp-malloc-jemalloc
+)
+vcs_info(ydb-core-base-ut_board_subscriber)
diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..547fbd0857
--- /dev/null
+++ b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,87 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-base-ut_board_subscriber)
+target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base
+)
+target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-core-base
+ cpp-actors-interconnect
+ cpp-actors-core
+ cpp-testing-unittest
+ core-testlib-basics
+ testlib-basics-default
+)
+target_link_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-core-base-ut_board_subscriber
+ TEST_TARGET
+ ydb-core-base-ut_board_subscriber
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-base-ut_board_subscriber
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(ydb-core-base-ut_board_subscriber)
diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/base/ut_board_subscriber/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..ac56b91954
--- /dev/null
+++ b/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,75 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-core-base-ut_board_subscriber)
+target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base
+)
+target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-core-base
+ cpp-actors-interconnect
+ cpp-actors-core
+ cpp-testing-unittest
+ core-testlib-basics
+ testlib-basics-default
+)
+target_sources(ydb-core-base-ut_board_subscriber PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp
+)
+set_property(
+ TARGET
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ SPLIT_FACTOR
+ 10
+)
+add_yunittest(
+ NAME
+ ydb-core-base-ut_board_subscriber
+ TEST_TARGET
+ ydb-core-base-ut_board_subscriber
+ TEST_ARG
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ LABELS
+ MEDIUM
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ PROCESSORS
+ 1
+)
+set_yunittest_property(
+ TEST
+ ydb-core-base-ut_board_subscriber
+ PROPERTY
+ TIMEOUT
+ 600
+)
+target_allocator(ydb-core-base-ut_board_subscriber
+ system_allocator
+)
+vcs_info(ydb-core-base-ut_board_subscriber)
diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp
index 70436d4853..a124bcecda 100644
--- a/ydb/core/discovery/discovery.cpp
+++ b/ydb/core/discovery/discovery.cpp
@@ -206,7 +206,7 @@ namespace NDiscoveryPrivate {
if (result.second) {
CLOG_D("Lookup"
<< ": path# " << database);
- Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second, false, false));
+ Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second));
}
return result.first;
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index ebcd32c338..477628da90 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -742,7 +742,7 @@ public:
}
if (PublishBoardPath) {
- auto actor = CreateBoardLookupActor(PublishBoardPath, SelfId(), *groupId, EBoardLookupMode::Majority, false, false);
+ auto actor = CreateBoardLookupActor(PublishBoardPath, SelfId(), *groupId, EBoardLookupMode::Majority);
BoardLookupActor = Register(actor);
}
}
diff --git a/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp b/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
index 6448d0482e..b7ea80d247 100644
--- a/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
+++ b/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp
@@ -40,8 +40,7 @@ public:
, Callback(std::move(callback)) {}
void Bootstrap() {
- auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority,
- false, false);
+ auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority);
BoardLookupId = Register(boardLookup);
Become(&TTakeResourcesSnapshotActor::WorkState);
diff --git a/ydb/core/load_test/service_actor.cpp b/ydb/core/load_test/service_actor.cpp
index df8ebdc2e8..3b5c0ea337 100644
--- a/ydb/core/load_test/service_actor.cpp
+++ b/ydb/core/load_test/service_actor.cpp
@@ -703,9 +703,7 @@ public:
RegisterWithSameMailbox(CreateBoardLookupActor(MakeEndpointsBoardPath(name),
SelfId(),
domainInfo->DefaultStateStorageGroup,
- EBoardLookupMode::Second,
- false,
- false));
+ EBoardLookupMode::Second));
tag = modifiedRequest.GetTag();
uuid = modifiedRequest.GetUuid();
}
diff --git a/ydb/core/mind/tenant_node_enumeration.cpp b/ydb/core/mind/tenant_node_enumeration.cpp
index e40380a925..045cefe738 100644
--- a/ydb/core/mind/tenant_node_enumeration.cpp
+++ b/ydb/core/mind/tenant_node_enumeration.cpp
@@ -99,7 +99,7 @@ public:
return ReportErrorAndDie();
const TString path = MakeTenantNodeEnumerationPath(TenantName);
- LookupActor = Register(CreateBoardLookupActor(path, SelfId(), statestorageGroupId, EBoardLookupMode::Majority, false, false));
+ LookupActor = Register(CreateBoardLookupActor(path, SelfId(), statestorageGroupId, EBoardLookupMode::Majority));
Become(&TThis::StateWait);
}
diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto
index 4165af89ba..d746ca2917 100644
--- a/ydb/core/protos/statestorage.proto
+++ b/ydb/core/protos/statestorage.proto
@@ -103,16 +103,25 @@ message TEvReplicaBoardLookup {
message TEvReplicaBoardCleanup {
};
+message TEvReplicaBoardUnsubscribe {
+};
+
message TBoardEntryInfo {
optional NActorsProto.TActorId Owner = 1;
optional bytes Payload = 2;
+ optional bool Dropped = 3 [default = false];
};
message TEvReplicaBoardInfo {
optional string Path = 1;
optional bool Dropped = 2;
repeated TBoardEntryInfo Info = 3;
-}
+};
+
+message TEvReplicaBoardInfoUpdate {
+ optional string Path = 1;
+ optional TBoardEntryInfo Info = 3;
+};
message TEndpointBoardEntry {
optional string Address = 1;
diff --git a/ydb/core/viewer/json_pipe_req.h b/ydb/core/viewer/json_pipe_req.h
index 4237bc4a29..8946e3d5a1 100644
--- a/ydb/core/viewer/json_pipe_req.h
+++ b/ydb/core/viewer/json_pipe_req.h
@@ -194,9 +194,7 @@ protected:
TBase::RegisterWithSameMailbox(CreateBoardLookupActor(MakeEndpointsBoardPath(path),
TBase::SelfId(),
domainInfo->DefaultStateStorageGroup,
- EBoardLookupMode::Second,
- false,
- false));
+ EBoardLookupMode::Second));
++Requests;
}