diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp')
-rw-r--r-- | library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp | 178 |
1 files changed, 178 insertions, 0 deletions
diff --git a/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp b/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp new file mode 100644 index 0000000000..5e48401b14 --- /dev/null +++ b/library/cpp/actors/interconnect/interconnect_nameserver_dynamic.cpp @@ -0,0 +1,178 @@ +#include "interconnect.h" +#include "interconnect_impl.h" +#include "interconnect_address.h" +#include "interconnect_nameserver_base.h" +#include "events_local.h" +#include "logging.h" + +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/log.h> + +namespace NActors { + + class TInterconnectDynamicNameserver + : public TInterconnectNameserverBase<TInterconnectDynamicNameserver> + , public TInterconnectLoggingBase + { + struct TPendingRequest { + TEvInterconnect::TEvResolveNode::TPtr Request; + TInstant Deadline; + + TPendingRequest(TEvInterconnect::TEvResolveNode::TPtr request, const TInstant& deadline) + : Request(request), Deadline(deadline) + { + } + }; + + TMap<ui32, TTableNameserverSetup::TNodeInfo> NodeTable; + TVector<TPendingRequest> PendingRequests; + TDuration PendingPeriod; + + void PrintInfo() { + TString logMsg = TStringBuilder() << "Table size: " << NodeTable.size(); + for (const auto& [nodeId, node] : NodeTable) { + TString str = TStringBuilder() << "\n > Node " << nodeId << " `" << node.Address << "`:" << node.Port << ", host: " << node.Host << ", resolveHost: " << node.ResolveHost; + logMsg += str; + } + LOG_DEBUG_IC("ICN01", "%s", logMsg.c_str()); + } + + bool IsNodeUpdated(const ui32 nodeId, const TString& address, const ui32 port) { + bool printInfo = false; + auto it = NodeTable.find(nodeId); + if (it == NodeTable.end()) { + LOG_DEBUG_IC("ICN02", "New node %u `%s`: %u", + nodeId, address.c_str(), port); + printInfo = true; + } else if (it->second.Address != address || it->second.Port != port) { + LOG_DEBUG_IC("ICN03", "Updated node %u `%s`: %u (from `%s`: %u)", + nodeId, address.c_str(), port, it->second.Address.c_str(), it->second.Port); + printInfo = true; + Send(TActivationContext::InterconnectProxy(nodeId), new TEvInterconnect::TEvDisconnect); + } + return printInfo; + } + + void DiscardTimedOutRequests(const TActorContext& ctx, ui32 compactionCount = 0) { + + auto now = Now(); + + for (auto& pending : PendingRequests) { + if (pending.Deadline > now) { + LOG_ERROR_IC("ICN06", "Unknown nodeId: %u", pending.Request->Get()->Record.GetNodeId()); + auto reply = new TEvLocalNodeInfo; + reply->NodeId = pending.Request->Get()->Record.GetNodeId(); + ctx.Send(pending.Request->Sender, reply); + pending.Request.Reset(); + compactionCount++; + } + } + + if (compactionCount) { + TVector<TPendingRequest> requests; + if (compactionCount < PendingRequests.size()) { // sanity check + requests.reserve(PendingRequests.size() - compactionCount); + } + for (auto& pending : PendingRequests) { + if (pending.Request) { + requests.emplace_back(pending.Request, pending.Deadline); + } + } + PendingRequests.swap(requests); + } + } + + void SchedulePeriodic() { + Schedule(TDuration::MilliSeconds(200), new TEvents::TEvWakeup()); + } + + public: + static constexpr EActivityType ActorActivityType() { + return NAMESERVICE; + } + + TInterconnectDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, const TDuration& pendingPeriod, ui32 /*resolvePoolId*/ ) + : TInterconnectNameserverBase<TInterconnectDynamicNameserver>(&TInterconnectDynamicNameserver::StateFunc, NodeTable) + , NodeTable(setup->StaticNodeTable) + , PendingPeriod(pendingPeriod) + { + Y_VERIFY(setup->IsEntriesUnique()); + } + + STFUNC(StateFunc) { + try { + switch (ev->GetTypeRewrite()) { + HFunc(TEvInterconnect::TEvResolveNode, Handle); + HFunc(TEvResolveAddress, Handle); + HFunc(TEvInterconnect::TEvListNodes, Handle); + HFunc(TEvInterconnect::TEvGetNode, Handle); + HFunc(TEvInterconnect::TEvNodesInfo, HandleUpdate); + CFunc(TEvents::TEvWakeup::EventType, HandlePeriodic); + } + } catch (...) { + LOG_ERROR_IC("ICN09", "%s", CurrentExceptionMessage().c_str()); + } + } + + void HandleMissedNodeId(TEvInterconnect::TEvResolveNode::TPtr& ev, + const TActorContext& ctx, + const TInstant& deadline) { + if (PendingPeriod) { + if (PendingRequests.size() == 0) { + SchedulePeriodic(); + } + PendingRequests.emplace_back(std::move(ev), Min(deadline, Now() + PendingPeriod)); + } else { + LOG_ERROR_IC("ICN07", "Unknown nodeId: %u", ev->Get()->Record.GetNodeId()); + TInterconnectNameserverBase::HandleMissedNodeId(ev, ctx, deadline); + } + } + + void HandleUpdate(TEvInterconnect::TEvNodesInfo::TPtr& ev, + const TActorContext& ctx) { + + auto request = ev->Get(); + LOG_DEBUG_IC("ICN04", "Update TEvNodesInfo with sz: %lu ", request->Nodes.size()); + + bool printInfo = false; + ui32 compactionCount = 0; + + for (const auto& node : request->Nodes) { + printInfo |= IsNodeUpdated(node.NodeId, node.Address, node.Port); + + NodeTable[node.NodeId] = TTableNameserverSetup::TNodeInfo( + node.Address, node.Host, node.ResolveHost, node.Port, node.Location); + + for (auto& pending : PendingRequests) { + if (pending.Request->Get()->Record.GetNodeId() == node.NodeId) { + LOG_DEBUG_IC("ICN05", "Pending nodeId: %u discovered", node.NodeId); + RegisterWithSameMailbox( + CreateResolveActor(node.NodeId, NodeTable[node.NodeId], pending.Request->Sender, SelfId(), pending.Deadline)); + pending.Request.Reset(); + compactionCount++; + } + } + } + + if (printInfo) { + PrintInfo(); + } + + DiscardTimedOutRequests(ctx, compactionCount); + } + + void HandlePeriodic(const TActorContext& ctx) { + DiscardTimedOutRequests(ctx, 0); + if (PendingRequests.size()) { + SchedulePeriodic(); + } + } + }; + + IActor* CreateDynamicNameserver(const TIntrusivePtr<TTableNameserverSetup>& setup, + const TDuration& pendingPeriod, + ui32 poolId) { + return new TInterconnectDynamicNameserver(setup, pendingPeriod, poolId); + } + +} |