diff options
author | shumkovnd <shumkovnd@yandex-team.com> | 2023-05-23 20:17:49 +0300 |
---|---|---|
committer | shumkovnd <shumkovnd@yandex-team.com> | 2023-05-23 20:17:49 +0300 |
commit | cf214e2939c005ecb728689e108180a752bacbc9 (patch) | |
tree | f2b0c8e8e749100b22841593e49586821e6554c6 | |
parent | 78300d3d4b282c3c948e3ad181ec183ce32adcc5 (diff) | |
download | ydb-cf214e2939c005ecb728689e108180a752bacbc9.tar.gz |
board lookup subscribers
21 files changed, 1047 insertions, 78 deletions
diff --git a/ydb/core/base/CMakeLists.darwin-x86_64.txt b/ydb/core/base/CMakeLists.darwin-x86_64.txt index bef496eecc..dc2eb3c54b 100644 --- a/ydb/core/base/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/base/CMakeLists.darwin-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(services) add_subdirectory(ut) +add_subdirectory(ut_board_subscriber) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/base/CMakeLists.linux-aarch64.txt b/ydb/core/base/CMakeLists.linux-aarch64.txt index c66927f294..1ea9e10a1b 100644 --- a/ydb/core/base/CMakeLists.linux-aarch64.txt +++ b/ydb/core/base/CMakeLists.linux-aarch64.txt @@ -8,6 +8,7 @@ add_subdirectory(services) add_subdirectory(ut) +add_subdirectory(ut_board_subscriber) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/base/CMakeLists.linux-x86_64.txt b/ydb/core/base/CMakeLists.linux-x86_64.txt index c66927f294..1ea9e10a1b 100644 --- a/ydb/core/base/CMakeLists.linux-x86_64.txt +++ b/ydb/core/base/CMakeLists.linux-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(services) add_subdirectory(ut) +add_subdirectory(ut_board_subscriber) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/base/CMakeLists.windows-x86_64.txt b/ydb/core/base/CMakeLists.windows-x86_64.txt index bef496eecc..dc2eb3c54b 100644 --- a/ydb/core/base/CMakeLists.windows-x86_64.txt +++ b/ydb/core/base/CMakeLists.windows-x86_64.txt @@ -8,6 +8,7 @@ add_subdirectory(services) add_subdirectory(ut) +add_subdirectory(ut_board_subscriber) get_built_tool_path( TOOL_enum_parser_bin TOOL_enum_parser_dependency diff --git a/ydb/core/base/board_lookup.cpp b/ydb/core/base/board_lookup.cpp index 890b88f6b9..28959ab861 100644 --- a/ydb/core/base/board_lookup.cpp +++ b/ydb/core/base/board_lookup.cpp @@ -24,6 +24,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { const TActorId Owner; const EBoardLookupMode Mode; const ui32 StateStorageGroupId; + const bool Subscriber; enum class EReplicaState { Unknown, @@ -35,10 +36,13 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { struct TReplica { TActorId Replica; EReplicaState State = EReplicaState::Unknown; + THashSet<TActorId> Infos; }; TVector<TReplica> Replicas; + TMap<TActorId, TEvStateStorage::TEvBoardInfo::TInfoEntry> Info; + THashMap<TActorId, THashSet<ui32>> InfoReplicas; ui32 WaitForReplicasToSuccess; @@ -46,31 +50,63 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { ui32 Replied = 0; ui32 NoInfo = 0; ui32 HasInfo = 0; + ui32 NotAvailable = 0; } Stats; void PassAway() override { - for (const auto &replica : Replicas) - if (replica.Replica.NodeId() != SelfId().NodeId()) + for (const auto &replica : Replicas) { + if (Subscriber) { + Send(replica.Replica, new TEvStateStorage::TEvReplicaBoardUnsubscribe()); + } + if (replica.Replica.NodeId() != SelfId().NodeId()) { Send(TActivationContext::InterconnectProxy(replica.Replica.NodeId()), new TEvents::TEvUnsubscribe()); + } + } TActor::PassAway(); } void NotAvailable() { - Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path)); + if (CurrentStateFunc() != &TThis::StateSubscribe) { + Send(Owner, new TEvStateStorage::TEvBoardInfo(TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path)); + } else { + Send(Owner, + new TEvStateStorage::TEvBoardInfoUpdate( + TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable, Path + ) + ); + } return PassAway(); } void CheckCompletion() { - if (Stats.HasInfo == WaitForReplicasToSuccess) { - auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>(TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); - reply->InfoEntries = std::move(Info); - Send(Owner, std::move(reply)); + if (CurrentStateFunc() != &TThis::StateSubscribe) { + if ((!Subscriber && Stats.HasInfo == WaitForReplicasToSuccess) || + (Subscriber && Stats.HasInfo + Stats.NoInfo == WaitForReplicasToSuccess)) { + auto reply = MakeHolder<TEvStateStorage::TEvBoardInfo>( + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + reply->InfoEntries = std::move(Info); + Send(Owner, std::move(reply)); + if (Subscriber) { + Become(&TThis::StateSubscribe); + return; + } + return PassAway(); + } - return PassAway(); + if (!Subscriber) { + if (Stats.Replied == Replicas.size()) { + return NotAvailable(); + } + } else { + if (Stats.NotAvailable > (Replicas.size() - WaitForReplicasToSuccess)) { + return NotAvailable(); + } + } + } else { + if (Stats.NotAvailable > (Replicas.size() - WaitForReplicasToSuccess)) { + return NotAvailable(); + } } - - if (Stats.Replied == Replicas.size()) - return NotAvailable(); } void Handle(TEvStateStorage::TEvResolveReplicasList::TPtr &ev) { @@ -84,7 +120,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { Replicas.resize(msg->Replicas.size()); for (auto idx : xrange(msg->Replicas.size())) { const TActorId &replica = msg->Replicas[idx]; - Send(replica, new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), false), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx); + Send(replica, + new TEvStateStorage::TEvReplicaBoardLookup(Path, TActorId(), Subscriber), + IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, idx); Replicas[idx].Replica = replica; Replicas[idx].State = EReplicaState::Unknown; } @@ -100,6 +138,7 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { break; case EBoardLookupMode::Majority: case EBoardLookupMode::MajorityDoubleTime: + case EBoardLookupMode::Subscription: WaitForReplicasToSuccess = (Replicas.size() / 2 + 1); break; default: @@ -109,6 +148,65 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { Become(&TThis::StateLookup); } + void Handle(TEvStateStorage::TEvReplicaBoardInfoUpdate::TPtr &ev) { + const auto &record = ev->Get()->Record; + const ui32 idx = ev->Cookie; + if (idx >= Replicas.size()) { + return; + } + auto &replica = Replicas[idx]; + if (replica.State == EReplicaState::NotAvailable) { + return; + } + + replica.State = EReplicaState::Ready; + + auto& info = record.GetInfo(); + const TActorId oid = ActorIdFromProto(info.GetOwner()); + + auto& replicas = InfoReplicas[oid]; + if (info.GetDropped()) { + replicas.erase(idx); + replica.Infos.erase(oid); + } else { + replicas.insert(idx); + replica.Infos.insert(oid); + } + + if (CurrentStateFunc() == &TThis::StateSubscribe) { + TEvStateStorage::TEvBoardInfoUpdate::TInfoEntryUpdate update; + update.Owner = oid; + if (info.GetDropped()) { + if (!replicas.empty()) { + return; + } + InfoReplicas.erase(oid); + Info.erase(oid); + update.Dropped = true; + } else { + if (Info[oid].Payload != info.GetPayload()) { + Info[oid].Payload = info.GetPayload(); + update.Payload = std::move(info.GetPayload()); + } + } + + auto reply = MakeHolder<TEvStateStorage::TEvBoardInfoUpdate>( + TEvStateStorage::TEvBoardInfo::EStatus::Ok, Path); + reply->Update = std::move(update); + Send(Owner, std::move(reply)); + } else { + if (info.GetDropped()) { + if (!replicas.empty()) { + return; + } + InfoReplicas.erase(oid); + Info.erase(oid); + } else { + Info[oid].Payload = std::move(info.GetPayload()); + } + } + } + void Handle(TEvStateStorage::TEvReplicaBoardInfo::TPtr &ev) { const auto &record = ev->Get()->Record; const ui32 idx = ev->Cookie; @@ -117,7 +215,6 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { auto &replica = Replicas[idx]; if (replica.State != EReplicaState::Unknown) return; - ++Stats.Replied; if (record.GetDropped()) { replica.State = EReplicaState::NoInfo; @@ -129,7 +226,9 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { for (auto &x : record.GetInfo()) { const TActorId oid = ActorIdFromProto(x.GetOwner()); - Info[oid].Payload = x.GetPayload(); + Info[oid].Payload = std::move(x.GetPayload()); + InfoReplicas[oid].insert(idx); + replica.Infos.insert(oid); } } @@ -138,17 +237,48 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { const ui32 nodeId = ev->Get()->NodeId; - for (auto &replica : Replicas) { - if (replica.Replica.NodeId() == nodeId && replica.State == EReplicaState::Unknown) { - replica.State = EReplicaState::NotAvailable; - ++Stats.Replied; - ++Stats.NoInfo; + for (ui32 idx = 0; idx < Replicas.size(); idx++) { + auto& replica = Replicas[idx]; + if (replica.Replica.NodeId() == nodeId) { + if (replica.State == EReplicaState::Unknown) { + ++Stats.Replied; + } + if (replica.State != EReplicaState::NotAvailable) { + replica.State = EReplicaState::NotAvailable; + ++Stats.NotAvailable; + } + + for (auto infoId : replica.Infos) { + InfoReplicas[infoId].erase(idx); + } } } CheckCompletion(); } + void Handle(TEvStateStorage::TEvReplicaShutdown::TPtr &ev) { + const ui32 idx = ev->Cookie; + if (idx >= Replicas.size()) + return; + auto &replica = Replicas[idx]; + Y_VERIFY(replica.Replica == ev->Sender); + + if (replica.State == EReplicaState::Unknown) { + ++Stats.Replied; + } + if (replica.State != EReplicaState::NotAvailable) { + replica.State = EReplicaState::NotAvailable; + ++Stats.NotAvailable; + } + + for (auto infoId : replica.Infos) { + InfoReplicas[infoId].erase(idx); + } + + CheckCompletion(); + } + void Handle(TEvents::TEvUndelivered::TPtr &ev) { auto *msg = ev->Get(); if (msg->SourceType != TEvStateStorage::TEvReplicaBoardLookup::EventType) @@ -161,7 +291,11 @@ class TBoardLookupActor : public TActorBootstrapped<TBoardLookupActor> { return; replica.State = EReplicaState::NotAvailable; ++Stats.Replied; - ++Stats.NoInfo; + ++Stats.NotAvailable; + + for (auto infoId : replica.Infos) { + InfoReplicas[infoId].erase(idx); + } CheckCompletion(); } @@ -175,6 +309,7 @@ public: , Owner(owner) , Mode(mode) , StateStorageGroupId(groupId) + , Subscriber(Mode == EBoardLookupMode::Subscription) {} void Bootstrap() { @@ -194,16 +329,26 @@ public: STATEFN(StateLookup) { switch (ev->GetTypeRewrite()) { hFunc(TEvStateStorage::TEvReplicaBoardInfo, Handle); + hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvStateStorage::TEvReplicaShutdown, Handle); + cFunc(TEvents::TEvPoisonPill::EventType, PassAway); + } + } + + STATEFN(StateSubscribe) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvStateStorage::TEvReplicaBoardInfoUpdate, Handle); hFunc(TEvents::TEvUndelivered, Handle); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvStateStorage::TEvReplicaShutdown, Handle); cFunc(TEvents::TEvPoisonPill::EventType, PassAway); } } }; -IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, bool sub, bool useNodeSubsriptions) { - Y_UNUSED(useNodeSubsriptions); - Y_VERIFY(!sub, "subscribe mode for board lookup not implemented yet"); +IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode) { return new TBoardLookupActor(path, owner, mode, groupId); } diff --git a/ydb/core/base/board_replica.cpp b/ydb/core/base/board_replica.cpp index c38fd20e35..c82e1debd8 100644 --- a/ydb/core/base/board_replica.cpp +++ b/ydb/core/base/board_replica.cpp @@ -18,6 +18,7 @@ namespace NKikimr { class TBoardReplicaActor : public TActor<TBoardReplicaActor> { + using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>; using TPathIndex = TMap<TString, TSet<ui32>>; @@ -26,11 +27,35 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { TActorId Owner; TOwnerIndex::iterator OwnerIt; TPathIndex::iterator PathIt; + TActorId Session = TActorId(); + }; + + struct TPathSubscribeData { + THashMap<TActorId, ui32> Subscribers; // Subcriber -> Cookie + }; + + struct TSubscriber { + TString Path; + TActorId Session = TActorId(); }; TVector<TEntry> Entries; TVector<ui32> AvailableEntries; + THashMap<TActorId, TSubscriber> Subscribers; + TMap<TString, TPathSubscribeData> PathToSubscribers; + + struct TSessionSubscribers { + THashSet<TActorId> Subscribers; + THashSet<TActorId> Publishers; + + bool Empty() const { + return Subscribers.empty() && Publishers.empty(); + } + }; + + THashMap<TActorId, TSessionSubscribers> Sessions; // InterconnectSession -> Session subscribers + TOwnerIndex IndexOwner; TPathIndex IndexPath; @@ -58,6 +83,8 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { return; } + auto pathSubscribeDataIt = PathToSubscribers.find(path); + auto ownerIt = IndexOwner.find(owner); if (ownerIt != IndexOwner.end()) { const ui32 entryIndex = ownerIt->second; @@ -68,14 +95,25 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { return; } + if (ev->InterconnectSession) { + entry.Session = ev->InterconnectSession; + } entry.Payload = record.GetPayload(); Y_VERIFY_DEBUG(entry.Owner == ActorIdFromProto(record.GetOwner())); + + if (pathSubscribeDataIt != PathToSubscribers.end()) { + SendUpdateToSubscribers(entry, false); + } + } else { const ui32 entryIndex = AllocateEntry(); TEntry &entry = Entries[entryIndex]; entry.Payload = record.GetPayload(); entry.Owner = ActorIdFromProto(record.GetOwner()); + if (ev->InterconnectSession) { + entry.Session = ev->InterconnectSession; + } auto ownerInsPairIt = IndexOwner.emplace(owner, entryIndex); entry.OwnerIt = ownerInsPairIt.first; @@ -83,60 +121,115 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { entry.PathIt = pathInsPairIt.first; entry.PathIt->second.emplace(entryIndex); - Send(owner, new TEvStateStorage::TEvReplicaBoardPublishAck, IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); + if (pathSubscribeDataIt != PathToSubscribers.end()) { + SendUpdateToSubscribers(entry, false); + } } - } - bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) { - const ui32 ownerNodeId = ownerIt->first.NodeId(); - if (ownerIt != IndexOwner.begin()) { - auto x = ownerIt; - --x; - if (x->first.NodeId() == ownerNodeId) - return false; + if (ev->InterconnectSession) { + auto sessionsIt = Sessions.find(ev->InterconnectSession); + if (sessionsIt == Sessions.end()) { + Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery); + Sessions[ev->InterconnectSession].Publishers.insert(ev->Sender); + } else { + sessionsIt->second.Publishers.insert(ev->Sender); + } } - ++ownerIt; - if (ownerIt != IndexOwner.end()) { - if (ownerIt->first.NodeId() == ownerNodeId) - return false; - } + auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardPublishAck>(); + auto resp = std::make_unique<IEventHandle>( + owner, SelfId(), reply.release(), + IEventHandle::FlagTrackDelivery, ev->Cookie); - return true; + if (ev->InterconnectSession) { + resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + TActivationContext::Send(resp.release()); } - void CleanupEntry(ui32 entryIndex) { + void CleanupEntry(ui32 entryIndex, TActorId session) { TEntry &entry = Entries[entryIndex]; + entry.PathIt->second.erase(entryIndex); if (entry.PathIt->second.empty()) { IndexPath.erase(entry.PathIt); } - if (IsLastEntryOnNode(entry.OwnerIt)) { - Send(TActivationContext::InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe()); - } + CleanupSessionForPublisher(session, entry.OwnerIt->first); + IndexOwner.erase(entry.OwnerIt); TString().swap(entry.Payload); entry.Owner = TActorId(); entry.PathIt = IndexPath.end(); entry.OwnerIt = IndexOwner.end(); + entry.Session = TActorId(); AvailableEntries.emplace_back(entryIndex); } - void PassAway() override { - ui32 prevNode = 0; - for (auto &xpair : IndexOwner) { - const ui32 nodeId = xpair.first.NodeId(); - if (nodeId != prevNode) { - Send(TActivationContext::InterconnectProxy(nodeId), new TEvents::TEvUnsubscribe()); - prevNode = nodeId; + void CleanupSubscriber(const TActorId& subscriber, TActorId session) { + auto subscriberIt = Subscribers.find(subscriber); + if (subscriberIt == Subscribers.end()) { + return; + } + + CleanupSessionForSubscriber(session, subscriber); + + auto& pathSubscribeData = PathToSubscribers[subscriberIt->second.Path]; + pathSubscribeData.Subscribers.erase(subscriberIt->first); + if (pathSubscribeData.Subscribers.empty()) { + PathToSubscribers.erase(subscriberIt->second.Path); + } + Subscribers.erase(subscriberIt); + } + + + void CleanupSessionForSubscriber(const TActorId& session, const TActorId& subscriber) { + if (!session) { + return; + } + auto sessionsIt = Sessions.find(session); + if (sessionsIt != Sessions.end()) { + sessionsIt->second.Subscribers.erase(subscriber); + if (sessionsIt->second.Empty()) { + Send(sessionsIt->first, new TEvents::TEvUnsubscribe()); + Sessions.erase(sessionsIt); } + } + } + + void CleanupSessionForPublisher(const TActorId& session, const TActorId& publisher) { + if (!session) { + return; + } + auto sessionsIt = Sessions.find(session); + if (sessionsIt != Sessions.end()) { + sessionsIt->second.Publishers.erase(publisher); + if (sessionsIt->second.Empty()) { + Send(sessionsIt->first, new TEvents::TEvUnsubscribe()); + Sessions.erase(sessionsIt); + } + } + } + + void PassAway() override { + + for (const auto& [session, sessionSubscribers] : Sessions) { + Send(session, new TEvents::TEvUnsubscribe()); + } + for (const auto& xpair : IndexOwner) { Send(xpair.first, new TEvStateStorage::TEvReplicaShutdown()); } + for (const auto& [path, pathSubscribeData] : PathToSubscribers) { + for (const auto& [subscriber, cookie] : pathSubscribeData.Subscribers) { + auto reply = MakeHolder<TEvStateStorage::TEvReplicaShutdown>(); + Send(subscriber, std::move(reply), 0, cookie); + } + } + // all cleanup in actor destructor TActor::PassAway(); } @@ -146,7 +239,14 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { if (ownerIt == IndexOwner.end()) // do nothing, already removed? return; - CleanupEntry(ownerIt->second); + const auto& entry = Entries[ownerIt->second]; + const auto& path = entry.PathIt->first; + auto pathSubscribeDataIt = PathToSubscribers.find(path); + if (pathSubscribeDataIt != PathToSubscribers.end()) { + SendUpdateToSubscribers(entry, true); + } + + CleanupEntry(ownerIt->second, ev->InterconnectSession); } void Handle(TEvStateStorage::TEvReplicaBoardLookup::TPtr &ev) { @@ -154,14 +254,36 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { const auto &path = record.GetPath(); if (record.GetSubscribe()) { - BLOG_ERROR("trying to subscribe on path, must be not implemented yet"); - // reply nothing, request suspicious - return; + auto& pathSubscribeData = PathToSubscribers[path]; + pathSubscribeData.Subscribers[ev->Sender] = ev->Cookie; + auto& subscriber = Subscribers[ev->Sender]; + subscriber.Path = path; + if (ev->InterconnectSession) { + subscriber.Session = ev->InterconnectSession; + auto sessionsIt = Sessions.find(ev->InterconnectSession); + if (sessionsIt == Sessions.end()) { + Send(ev->InterconnectSession, new TEvents::TEvSubscribe, IEventHandle::FlagTrackDelivery); + Sessions[ev->InterconnectSession].Subscribers.insert(ev->Sender); + } else { + sessionsIt->second.Subscribers.insert(ev->Sender); + } + } + } + + ui32 flags = 0; + if (record.GetSubscribe()) { + flags = IEventHandle::FlagTrackDelivery; } auto pathIt = IndexPath.find(path); if (pathIt == IndexPath.end()) { - Send(ev->Sender, new TEvStateStorage::TEvReplicaBoardInfo(path, true), 0, ev->Cookie); + auto reply = std::make_unique<TEvStateStorage::TEvReplicaBoardInfo>(path, true); + auto resp = std::make_unique<IEventHandle>( + ev->Sender, ev->Recipient, reply.release(), flags, ev->Cookie); + if (ev->InterconnectSession) { + resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + TActivationContext::Send(resp.release()); return; } @@ -175,27 +297,130 @@ class TBoardReplicaActor : public TActor<TBoardReplicaActor> { ex->SetPayload(entry.Payload); } - Send(ev->Sender, std::move(reply), 0, ev->Cookie); + auto resp = std::make_unique<IEventHandle>( + ev->Sender, SelfId(), reply.Release(), flags, ev->Cookie); + if (ev->InterconnectSession) { + resp->Rewrite(TEvInterconnect::EvForward, ev->InterconnectSession); + } + TActivationContext::Send(resp.release()); } void Handle(TEvents::TEvUndelivered::TPtr &ev) { - auto ownerIt = IndexOwner.find(ev->Sender); - if (ownerIt == IndexOwner.end()) + auto *msg = ev->Get(); + switch (msg->SourceType) { + case TEvents::TEvSubscribe::EventType: { + DisconnectSession(ev->Sender); + return; + } + case TEvStateStorage::TEvReplicaBoardInfo::EventType: { + auto subscribersIt = Subscribers.find(ev->Sender); + if (subscribersIt == Subscribers.end()) { + return; + } + if (subscribersIt->second.Session != ev->InterconnectSession) { + return; + } + CleanupSubscriber(ev->Sender, ev->InterconnectSession); + break; + } + case TEvStateStorage::TEvReplicaBoardPublishAck::EventType: { + auto ownerIt = IndexOwner.find(ev->Sender); + if (ownerIt == IndexOwner.end()) + return; + + const auto& entry = Entries[ownerIt->second]; + if (entry.Session != ev->InterconnectSession) { + return; + } + const auto& path = entry.PathIt->first; + auto pathSubscribeDataIt = PathToSubscribers.find(path); + if (pathSubscribeDataIt != PathToSubscribers.end()) { + SendUpdateToSubscribers(entry, true); + } + + CleanupEntry(ownerIt->second, ev->InterconnectSession); + break; + } + default: + Y_FAIL("Unexpected case"); + } + } + + void SendUpdateToSubscribers(const TEntry& entry, bool dropped ) { + const auto& path = entry.PathIt->first; + + auto pathSubscribeDataIt = PathToSubscribers.find(path); + if (pathSubscribeDataIt == PathToSubscribers.end()) { return; + } + + auto& pathSubscribeData = pathSubscribeDataIt->second; - CleanupEntry(ownerIt->second); + NKikimrStateStorage::TEvReplicaBoardInfoUpdate record; + auto *info = record.MutableInfo(); + ActorIdToProto(entry.Owner, info->MutableOwner()); + if (dropped) { + info->SetDropped(true); + } else { + info->SetPayload(entry.Payload); + } + + for (const auto& subscriber : pathSubscribeData.Subscribers) { + auto reply = MakeHolder<TEvStateStorage::TEvReplicaBoardInfoUpdate>(path); + reply->Record = record; + Send(subscriber.first, std::move(reply), 0, subscriber.second); + } } void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { - auto *msg = ev->Get(); - const ui32 nodeId = msg->NodeId; - auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0)); - while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) { - const ui32 entryToCleanupIndex = ownerIt->second; - ++ownerIt; - CleanupEntry(entryToCleanupIndex); + DisconnectSession(ev->Sender); + } + + void Handle(TEvStateStorage::TEvReplicaBoardUnsubscribe::TPtr& ev) { + const auto& sender = ev->Sender; + + CleanupSubscriber(sender, ev->InterconnectSession); + } + + void DisconnectSession(const TActorId& session) { + auto sessionsIt = Sessions.find(session); + if (sessionsIt == Sessions.end()) { + return; + } + + for (const auto& subscriber : sessionsIt->second.Subscribers) { + auto subscribersIt = Subscribers.find(subscriber); + if (subscribersIt == Subscribers.end()) { + continue; + } + if (subscribersIt->second.Session == session) { + CleanupSubscriber(subscriber, TActorId()); + } + } + + for (const auto& publisher : sessionsIt->second.Publishers) { + auto ownerIt = IndexOwner.find(publisher); + if (ownerIt == IndexOwner.end()) { + continue; + } + + const auto& entry = Entries[ownerIt->second]; + + if (entry.Session != session) { + continue; + } + const auto& path = entry.PathIt->first; + auto pathSubscribeDataIt = PathToSubscribers.find(path); + if (pathSubscribeDataIt != PathToSubscribers.end()) { + SendUpdateToSubscribers(entry, true); + } + + CleanupEntry(ownerIt->second, TActorId()); } + + Sessions.erase(sessionsIt); } + public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::BOARD_REPLICA_ACTOR; @@ -213,6 +438,7 @@ public: hFunc(TEvents::TEvUndelivered, Handle); cFunc(TEvents::TEvPoison::EventType, PassAway); hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvStateStorage::TEvReplicaBoardUnsubscribe, Handle); IgnoreFunc(TEvInterconnect::TEvNodeConnected); default: diff --git a/ydb/core/base/board_subscriber_ut.cpp b/ydb/core/base/board_subscriber_ut.cpp new file mode 100644 index 0000000000..82432dde09 --- /dev/null +++ b/ydb/core/base/board_subscriber_ut.cpp @@ -0,0 +1,197 @@ +#include <ydb/core/base/statestorage.h> +#include <ydb/core/base/statestorage_impl.h> + +#include <ydb/core/base/pathid.h> +#include <ydb/core/testlib/basics/appdata.h> +#include <ydb/core/testlib/basics/helpers.h> + +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_impl.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <ydb/core/testlib/basics/runtime.h> + +#include <util/generic/vector.h> +#include <util/generic/xrange.h> + +namespace NKikimr { + +class TBoardSubscriberTest: public NUnitTest::TTestBase { + + TActorId CreateSubscriber(const TString& path, const TActorId& owner, ui32 nodeIdx) { + const TActorId subscriber = Context->Register( + CreateBoardLookupActor(path, owner, 0, EBoardLookupMode::Subscription), nodeIdx + ); + Context->EnableScheduleForActor(subscriber); + return subscriber; + } + + TActorId CreatePublisher( + const TString& path, const TString& payload, const TActorId& owner, ui32 nodeIdx) { + const TActorId publisher = Context->Register( + CreateBoardPublishActor(path, payload, owner, 0, 0, true), nodeIdx + ); + Context->EnableScheduleForActor(publisher); + return publisher; + } + + void Send( + const TActorId& recipient, + const TActorId& sender, + IEventBase* ev, + ui32 nodeIdx, + bool viaActorSystem = false) { + Context->Send(new IEventHandle(recipient, sender, ev, 0, 0), nodeIdx, viaActorSystem); + } + + void KillPublisher(const TActorId& owner, const TActorId& publisher, ui32 nodeIdx) { + Send(publisher, owner, new TEvents::TEvPoisonPill(), nodeIdx); + } + + void Disconnect(ui32 nodeIndexFrom, ui32 nodeIndexTo) { + const TActorId proxy = Context->GetInterconnectProxy(nodeIndexFrom, nodeIndexTo); + + Send(proxy, TActorId(), new TEvInterconnect::TEvDisconnect(), nodeIndexFrom, true); + + //Wait for event TEvInterconnect::EvNodeDisconnected + TDispatchOptions options; + options.FinalEvents.emplace_back(TEvInterconnect::EvNodeDisconnected); + Context->DispatchEvents(options); + } + +public: + void SetUp() override { + Context = MakeHolder<TTestBasicRuntime>(2); + + for (ui32 i : xrange(Context->GetNodeCount())) { + SetupStateStorage(*Context, i, 0); + } + + Context->Initialize(TAppPrepare().Unwrap()); + } + + void TearDown() override { + Context.Reset(); + } + + UNIT_TEST_SUITE(TBoardSubscriberTest); + UNIT_TEST(SimpleSubscriber); + UNIT_TEST(ManySubscribersManyPublisher); + UNIT_TEST(DisconnectReplica); + UNIT_TEST_SUITE_END(); + + void SimpleSubscriber(); + void ManySubscribersManyPublisher(); + void DisconnectReplica(); + +private: + THolder<TTestBasicRuntime> Context; +}; + +UNIT_TEST_SUITE_REGISTRATION(TBoardSubscriberTest); + +void TBoardSubscriberTest::SimpleSubscriber() { + const auto edgeSubscriber = Context->AllocateEdgeActor(0); + CreateSubscriber("path", edgeSubscriber, 0); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } + + const auto edgePublisher = Context->AllocateEdgeActor(1); + const TActorId publisher = CreatePublisher("path", "test", edgePublisher, 1); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &update = event->Get()->Update; + + UNIT_ASSERT(!update.Dropped); + UNIT_ASSERT_EQUAL(update.Owner, publisher); + UNIT_ASSERT_EQUAL(update.Payload, "test"); + } + + KillPublisher(edgePublisher, publisher, 1); + + { + auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>( + {edgeSubscriber}, [edgeSubscriber](const auto& ev){ + const auto &update = ev->Get()->Update; + if (ev->Recipient == edgeSubscriber && update.Dropped) { + return true; + } + return false; + }); + UNIT_ASSERT(event); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + const auto &update = event->Get()->Update; + + UNIT_ASSERT(update.Dropped); + UNIT_ASSERT_EQUAL(update.Owner, publisher); + } +} + +void TBoardSubscriberTest::ManySubscribersManyPublisher() { + size_t subscribersCount = 10; + size_t publishersCount = 10; + + THashSet<TActorId> subscribers; + + for (size_t i = 0; i < subscribersCount; i++) { + const auto edgeSubscriber = Context->AllocateEdgeActor(i % 2); + CreateSubscriber("path", edgeSubscriber, i % 2); + subscribers.insert(edgeSubscriber); + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } + } + + for (size_t i = 0; i < publishersCount; i++) { + const auto edgePublisher = Context->AllocateEdgeActor(i % 2); + const auto publisher = CreatePublisher("path", ToString(i), edgePublisher, i % 2); + for (const auto& subscriber : subscribers) { + auto event = Context->GrabEdgeEventIf<TEvStateStorage::TEvBoardInfoUpdate>( + {subscriber}, [publisher](const auto& ev) mutable { + if (ev->Get()->Update.Owner == publisher) { + return true; + } + return false; + }); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT_EQUAL(event->Get()->Update.Payload, ToString(i)); + } + } +} + +void TBoardSubscriberTest::DisconnectReplica() { + + std::vector<TActorId> replicas; + + { + const auto edgeSubscriber = Context->AllocateEdgeActor(1); + CreateSubscriber("path", edgeSubscriber, 1); + + { + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfo>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::Ok); + UNIT_ASSERT_EQUAL(event->Get()->Path, "path"); + UNIT_ASSERT(event->Get()->InfoEntries.empty()); + } + + Disconnect(1, 0); + + auto event = Context->GrabEdgeEvent<TEvStateStorage::TEvBoardInfoUpdate>(edgeSubscriber); + UNIT_ASSERT_EQUAL(event->Get()->Status, TEvStateStorage::TEvBoardInfo::EStatus::NotAvailable); + } +} + +} // NKikimr diff --git a/ydb/core/base/statestorage.h b/ydb/core/base/statestorage.h index df49ba869a..f3a7c91608 100644 --- a/ydb/core/base/statestorage.h +++ b/ydb/core/base/statestorage.h @@ -26,6 +26,7 @@ struct TEvStateStorage { EvListSchemeBoard, // all EvUpdateGroupConfig, EvListStateStorage, + EvBoardInfoUpdate, // replies (local, from proxy) EvInfo = EvLookup + 512, @@ -54,9 +55,11 @@ struct TEvStateStorage { EvReplicaBoardPublish = EvLock + 4 * 512, EvReplicaBoardLookup, EvReplicaBoardCleanup, + EvReplicaBoardUnsubscribe, EvReplicaBoardPublishAck = EvLock + 5 * 512, EvReplicaBoardInfo, + EvReplicaBoardInfoUpdate, EvReplicaProbeSubscribe = EvLock + 6 * 512, EvReplicaProbeUnsubscribe, @@ -371,8 +374,10 @@ struct TEvStateStorage { struct TEvReplicaBoardPublish; struct TEvReplicaBoardLookup; struct TEvReplicaBoardCleanup; + struct TEvReplicaBoardUnsubscribe; struct TEvReplicaBoardPublishAck; struct TEvReplicaBoardInfo; + struct TEvReplicaBoardInfoUpdate; struct TEvListSchemeBoard; struct TEvListSchemeBoardResult; struct TEvListStateStorage; @@ -450,6 +455,30 @@ struct TEvStateStorage { , InfoEntries(x.InfoEntries) {} }; + + struct TEvBoardInfoUpdate : public TEventLocal<TEvBoardInfoUpdate, EvBoardInfoUpdate> { + + struct TInfoEntryUpdate { + TActorId Owner; + TString Payload; + bool Dropped = false; + }; + + const TEvBoardInfo::EStatus Status; + const TString Path; + TInfoEntryUpdate Update; + + TEvBoardInfoUpdate(TEvBoardInfo::EStatus status, const TString &path) + : Status(status) + , Path(path) + {} + + TEvBoardInfoUpdate(const TEvBoardInfoUpdate &x) + : Status(x.Status) + , Path(x.Path) + , Update(x.Update) + {} + }; }; struct TStateStorageInfo : public TThrRefBase { @@ -512,6 +541,7 @@ enum class EBoardLookupMode { FirstNonEmptyDoubleTime, SecondNonEmptyDoubleTime, MajorityDoubleTime, + Subscription, }; TIntrusivePtr<TStateStorageInfo> BuildStateStorageInfo(char (&namePrefix)[TActorId::MaxServiceIDLength], const NKikimrConfig::TDomainsConfig::TStateStorage& config); @@ -530,7 +560,7 @@ IActor* CreateStateStorageTabletGuardian(ui64 tabletId, const TActorId &leader, IActor* CreateStateStorageFollowerGuardian(ui64 tabletId, const TActorId &follower); // created as followerCandidate IActor* CreateStateStorageBoardReplica(const TIntrusivePtr<TStateStorageInfo> &, ui32); IActor* CreateSchemeBoardReplica(const TIntrusivePtr<TStateStorageInfo>&, ui32); -IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode, bool sub, bool useNodeSubscriptions); +IActor* CreateBoardLookupActor(const TString &path, const TActorId &owner, ui32 groupId, EBoardLookupMode mode); IActor* CreateBoardPublishActor(const TString &path, const TString &payload, const TActorId &owner, ui32 groupId, ui32 ttlMs, bool reg); TString MakeEndpointsBoardPath(const TString &database); diff --git a/ydb/core/base/statestorage_impl.h b/ydb/core/base/statestorage_impl.h index 8951dc6c4b..669518eea9 100644 --- a/ydb/core/base/statestorage_impl.h +++ b/ydb/core/base/statestorage_impl.h @@ -305,12 +305,18 @@ struct TEvStateStorage::TEvReplicaBoardCleanup : public TEventPB<TEvStateStorage {} }; +struct TEvStateStorage::TEvReplicaBoardUnsubscribe : public TEventPB<TEvStateStorage::TEvReplicaBoardUnsubscribe, NKikimrStateStorage::TEvReplicaBoardUnsubscribe, TEvStateStorage::EvReplicaBoardUnsubscribe> { + TEvReplicaBoardUnsubscribe() + {} +}; + struct TEvStateStorage::TEvReplicaBoardPublishAck : public TEventPB<TEvStateStorage::TEvReplicaBoardPublishAck, NKikimrStateStorage::TEvReplicaBoardPublishAck, TEvStateStorage::EvReplicaBoardPublishAck> { TEvReplicaBoardPublishAck() {} }; struct TEvStateStorage::TEvReplicaBoardInfo : public TEventPB<TEvStateStorage::TEvReplicaBoardInfo, NKikimrStateStorage::TEvReplicaBoardInfo, TEvStateStorage::EvReplicaBoardInfo> { + TEvReplicaBoardInfo() {} @@ -321,6 +327,17 @@ struct TEvStateStorage::TEvReplicaBoardInfo : public TEventPB<TEvStateStorage::T } }; +struct TEvStateStorage::TEvReplicaBoardInfoUpdate : public TEventPB<TEvStateStorage::TEvReplicaBoardInfoUpdate, NKikimrStateStorage::TEvReplicaBoardInfoUpdate, TEvStateStorage::EvReplicaBoardInfoUpdate> { + + TEvReplicaBoardInfoUpdate() + {} + + TEvReplicaBoardInfoUpdate(const TString &path) + { + Record.SetPath(path); + } +}; + IActor* CreateStateStorageReplicaProbe(TActorId replica); } diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..b6884fef03 --- /dev/null +++ b/ydb/core/base/ut_board_subscriber/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,82 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-base-ut_board_subscriber) +target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base +) +target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-base + cpp-actors-interconnect + cpp-actors-core + cpp-testing-unittest + core-testlib-basics + testlib-basics-default +) +target_link_options(ydb-core-base-ut_board_subscriber PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp +) +set_property( + TARGET + ydb-core-base-ut_board_subscriber + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-base-ut_board_subscriber + TEST_TARGET + ydb-core-base-ut_board_subscriber + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-base-ut_board_subscriber + system_allocator +) +vcs_info(ydb-core-base-ut_board_subscriber) diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..b003f94e45 --- /dev/null +++ b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-aarch64.txt @@ -0,0 +1,85 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-base-ut_board_subscriber) +target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base +) +target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + cpp-testing-unittest_main + ydb-core-base + cpp-actors-interconnect + cpp-actors-core + cpp-testing-unittest + core-testlib-basics + testlib-basics-default +) +target_link_options(ydb-core-base-ut_board_subscriber PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp +) +set_property( + TARGET + ydb-core-base-ut_board_subscriber + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-base-ut_board_subscriber + TEST_TARGET + ydb-core-base-ut_board_subscriber + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-base-ut_board_subscriber + cpp-malloc-jemalloc +) +vcs_info(ydb-core-base-ut_board_subscriber) diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..547fbd0857 --- /dev/null +++ b/ydb/core/base/ut_board_subscriber/CMakeLists.linux-x86_64.txt @@ -0,0 +1,87 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-base-ut_board_subscriber) +target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base +) +target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-base + cpp-actors-interconnect + cpp-actors-core + cpp-testing-unittest + core-testlib-basics + testlib-basics-default +) +target_link_options(ydb-core-base-ut_board_subscriber PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp +) +set_property( + TARGET + ydb-core-base-ut_board_subscriber + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-base-ut_board_subscriber + TEST_TARGET + ydb-core-base-ut_board_subscriber + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-base-ut_board_subscriber + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(ydb-core-base-ut_board_subscriber) diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/base/ut_board_subscriber/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt b/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..ac56b91954 --- /dev/null +++ b/ydb/core/base/ut_board_subscriber/CMakeLists.windows-x86_64.txt @@ -0,0 +1,75 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-core-base-ut_board_subscriber) +target_compile_options(ydb-core-base-ut_board_subscriber PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base +) +target_link_libraries(ydb-core-base-ut_board_subscriber PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-core-base + cpp-actors-interconnect + cpp-actors-core + cpp-testing-unittest + core-testlib-basics + testlib-basics-default +) +target_sources(ydb-core-base-ut_board_subscriber PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/base/board_subscriber_ut.cpp +) +set_property( + TARGET + ydb-core-base-ut_board_subscriber + PROPERTY + SPLIT_FACTOR + 10 +) +add_yunittest( + NAME + ydb-core-base-ut_board_subscriber + TEST_TARGET + ydb-core-base-ut_board_subscriber + TEST_ARG + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + LABELS + MEDIUM +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + PROCESSORS + 1 +) +set_yunittest_property( + TEST + ydb-core-base-ut_board_subscriber + PROPERTY + TIMEOUT + 600 +) +target_allocator(ydb-core-base-ut_board_subscriber + system_allocator +) +vcs_info(ydb-core-base-ut_board_subscriber) diff --git a/ydb/core/discovery/discovery.cpp b/ydb/core/discovery/discovery.cpp index 70436d4853..a124bcecda 100644 --- a/ydb/core/discovery/discovery.cpp +++ b/ydb/core/discovery/discovery.cpp @@ -206,7 +206,7 @@ namespace NDiscoveryPrivate { if (result.second) { CLOG_D("Lookup" << ": path# " << database); - Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second, false, false)); + Register(CreateBoardLookupActor(database, SelfId(), groupId, EBoardLookupMode::Second)); } return result.first; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index ebcd32c338..477628da90 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -742,7 +742,7 @@ public: } if (PublishBoardPath) { - auto actor = CreateBoardLookupActor(PublishBoardPath, SelfId(), *groupId, EBoardLookupMode::Majority, false, false); + auto actor = CreateBoardLookupActor(PublishBoardPath, SelfId(), *groupId, EBoardLookupMode::Majority); BoardLookupActor = Register(actor); } } diff --git a/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp b/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp index 6448d0482e..b7ea80d247 100644 --- a/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp +++ b/ydb/core/kqp/rm_service/kqp_resource_tracker.cpp @@ -40,8 +40,7 @@ public: , Callback(std::move(callback)) {} void Bootstrap() { - auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority, - false, false); + auto boardLookup = CreateBoardLookupActor(BoardPath, SelfId(), StateStorageGroupId, EBoardLookupMode::Majority); BoardLookupId = Register(boardLookup); Become(&TTakeResourcesSnapshotActor::WorkState); diff --git a/ydb/core/load_test/service_actor.cpp b/ydb/core/load_test/service_actor.cpp index df8ebdc2e8..3b5c0ea337 100644 --- a/ydb/core/load_test/service_actor.cpp +++ b/ydb/core/load_test/service_actor.cpp @@ -703,9 +703,7 @@ public: RegisterWithSameMailbox(CreateBoardLookupActor(MakeEndpointsBoardPath(name), SelfId(), domainInfo->DefaultStateStorageGroup, - EBoardLookupMode::Second, - false, - false)); + EBoardLookupMode::Second)); tag = modifiedRequest.GetTag(); uuid = modifiedRequest.GetUuid(); } diff --git a/ydb/core/mind/tenant_node_enumeration.cpp b/ydb/core/mind/tenant_node_enumeration.cpp index e40380a925..045cefe738 100644 --- a/ydb/core/mind/tenant_node_enumeration.cpp +++ b/ydb/core/mind/tenant_node_enumeration.cpp @@ -99,7 +99,7 @@ public: return ReportErrorAndDie(); const TString path = MakeTenantNodeEnumerationPath(TenantName); - LookupActor = Register(CreateBoardLookupActor(path, SelfId(), statestorageGroupId, EBoardLookupMode::Majority, false, false)); + LookupActor = Register(CreateBoardLookupActor(path, SelfId(), statestorageGroupId, EBoardLookupMode::Majority)); Become(&TThis::StateWait); } diff --git a/ydb/core/protos/statestorage.proto b/ydb/core/protos/statestorage.proto index 4165af89ba..d746ca2917 100644 --- a/ydb/core/protos/statestorage.proto +++ b/ydb/core/protos/statestorage.proto @@ -103,16 +103,25 @@ message TEvReplicaBoardLookup { message TEvReplicaBoardCleanup { }; +message TEvReplicaBoardUnsubscribe { +}; + message TBoardEntryInfo { optional NActorsProto.TActorId Owner = 1; optional bytes Payload = 2; + optional bool Dropped = 3 [default = false]; }; message TEvReplicaBoardInfo { optional string Path = 1; optional bool Dropped = 2; repeated TBoardEntryInfo Info = 3; -} +}; + +message TEvReplicaBoardInfoUpdate { + optional string Path = 1; + optional TBoardEntryInfo Info = 3; +}; message TEndpointBoardEntry { optional string Address = 1; diff --git a/ydb/core/viewer/json_pipe_req.h b/ydb/core/viewer/json_pipe_req.h index 4237bc4a29..8946e3d5a1 100644 --- a/ydb/core/viewer/json_pipe_req.h +++ b/ydb/core/viewer/json_pipe_req.h @@ -194,9 +194,7 @@ protected: TBase::RegisterWithSameMailbox(CreateBoardLookupActor(MakeEndpointsBoardPath(path), TBase::SelfId(), domainInfo->DefaultStateStorageGroup, - EBoardLookupMode::Second, - false, - false)); + EBoardLookupMode::Second)); ++Requests; } |