aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/02_discovery
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:26:46 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:32:58 +0300
commit7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch)
treed25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery
parent10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff)
downloadydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery')
-rw-r--r--library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt53
-rw-r--r--library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt55
-rw-r--r--library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt56
-rw-r--r--library/cpp/actors/examples/02_discovery/CMakeLists.txt15
-rw-r--r--library/cpp/actors/examples/02_discovery/endpoint.cpp118
-rw-r--r--library/cpp/actors/examples/02_discovery/lookup.cpp134
-rw-r--r--library/cpp/actors/examples/02_discovery/main.cpp136
-rw-r--r--library/cpp/actors/examples/02_discovery/protocol.proto19
-rw-r--r--library/cpp/actors/examples/02_discovery/publish.cpp113
-rw-r--r--library/cpp/actors/examples/02_discovery/replica.cpp182
-rw-r--r--library/cpp/actors/examples/02_discovery/services.h85
11 files changed, 966 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..2eaa6534d8
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt
@@ -0,0 +1,53 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(example_02_discovery)
+target_link_libraries(example_02_discovery PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ cpp-actors-http
+ contrib-libs-protobuf
+)
+target_link_options(example_02_discovery PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_proto_messages(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto
+)
+target_sources(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp
+)
+target_proto_addincls(example_02_discovery
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(example_02_discovery
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+vcs_info(example_02_discovery)
diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..1883d9ddad
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,55 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(example_02_discovery)
+target_link_libraries(example_02_discovery PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ cpp-actors-http
+ contrib-libs-protobuf
+)
+target_link_options(example_02_discovery PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_proto_messages(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto
+)
+target_sources(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp
+)
+target_proto_addincls(example_02_discovery
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(example_02_discovery
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+vcs_info(example_02_discovery)
diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt
new file mode 100644
index 0000000000..855362b4ab
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt
@@ -0,0 +1,56 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(example_02_discovery)
+target_link_libraries(example_02_discovery PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-actors-core
+ cpp-actors-dnsresolver
+ cpp-actors-interconnect
+ cpp-actors-http
+ contrib-libs-protobuf
+)
+target_link_options(example_02_discovery PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_proto_messages(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto
+)
+target_sources(example_02_discovery PRIVATE
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp
+ ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp
+)
+target_proto_addincls(example_02_discovery
+ ./
+ ${CMAKE_SOURCE_DIR}/
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+ ${CMAKE_BINARY_DIR}
+ ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src
+)
+target_proto_outs(example_02_discovery
+ --cpp_out=${CMAKE_BINARY_DIR}/
+ --cpp_styleguide_out=${CMAKE_BINARY_DIR}/
+)
+vcs_info(example_02_discovery)
diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.txt
new file mode 100644
index 0000000000..3e0811fb22
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/CMakeLists.txt
@@ -0,0 +1,15 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/library/cpp/actors/examples/02_discovery/endpoint.cpp b/library/cpp/actors/examples/02_discovery/endpoint.cpp
new file mode 100644
index 0000000000..97780e8b4c
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/endpoint.cpp
@@ -0,0 +1,118 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/protos/services_common.pb.h>
+
+#include <library/cpp/actors/http/http.h>
+#include <library/cpp/actors/http/http_proxy.h>
+
+#include <util/system/hostname.h>
+#include <util/string/builder.h>
+
+class TExampleHttpRequest : public TActor<TExampleHttpRequest> {
+ TIntrusivePtr<TExampleStorageConfig> Config;
+ const TString PublishKey;
+
+ TActorId HttpProxy;
+ NHttp::THttpIncomingRequestPtr Request;
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) {
+ Request = std::move(ev->Get()->Request);
+ HttpProxy = ev->Sender;
+
+ Register(CreateLookupActor(Config.Get(), PublishKey, SelfId()));
+ }
+
+ void Handle(TEvExample::TEvInfo::TPtr &ev) {
+ auto *msg = ev->Get();
+
+ TStringBuilder body;
+ for (const auto &x : msg->Payloads)
+ body << x << Endl;
+
+ auto response = Request->CreateResponseOK(body, "application/text; charset=utf-8");
+ Send(HttpProxy, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response));
+
+ PassAway();
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleHttpRequest(TExampleStorageConfig *config, const TString &publishKey)
+ : TActor(&TThis::StateWork)
+ , Config(config)
+ , PublishKey(publishKey)
+ {}
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ hFunc(TEvExample::TEvInfo, Handle);
+ }
+ }
+};
+
+class TExampleHttpEndpoint : public TActorBootstrapped<TExampleHttpEndpoint> {
+ TIntrusivePtr<TExampleStorageConfig> Config;
+ const TString PublishKey;
+ const ui16 HttpPort;
+
+ TActorId PublishActor;
+ TActorId HttpProxy;
+
+ std::shared_ptr<NMonitoring::TMetricRegistry> SensorsRegistry = std::make_shared<NMonitoring::TMetricRegistry>();
+
+ void PassAway() override {
+ Send(PublishActor, new TEvents::TEvPoison());
+ Send(HttpProxy, new TEvents::TEvPoison());
+
+ return TActor::PassAway();
+ }
+
+ void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) {
+ const TActorId reqActor = Register(new TExampleHttpRequest(Config.Get(), PublishKey));
+ TlsActivationContext->Send(ev->Forward(reqActor));
+ }
+
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleHttpEndpoint(TExampleStorageConfig *config, const TString &publishKey, ui16 port)
+ : Config(config)
+ , PublishKey(publishKey)
+ , HttpPort(port)
+ {
+ }
+
+ void Bootstrap() {
+ const TString publishPayload = ToString(HttpPort);
+ PublishActor = Register(CreatePublishActor(Config.Get(), PublishKey, publishPayload));
+ HttpProxy = Register(NHttp::CreateHttpProxy(SensorsRegistry));
+
+ Send(HttpProxy, new NHttp::TEvHttpProxy::TEvAddListeningPort(HttpPort, FQDNHostName()));
+ Send(HttpProxy, new NHttp::TEvHttpProxy::TEvRegisterHandler("/list", SelfId()));
+
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle);
+ default:
+ break;
+ }
+ }
+};
+
+IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 port) {
+ return new TExampleHttpEndpoint(config, publishKey, port);
+}
diff --git a/library/cpp/actors/examples/02_discovery/lookup.cpp b/library/cpp/actors/examples/02_discovery/lookup.cpp
new file mode 100644
index 0000000000..979a38d215
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/lookup.cpp
@@ -0,0 +1,134 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+
+class TExampleLookupRequestActor : public TActor<TExampleLookupRequestActor> {
+ const TActorId Owner;
+ const TActorId Replica;
+ const TString Key;
+
+ void Registered(TActorSystem* sys, const TActorId&) override {
+ const auto flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
+ sys->Send(new IEventHandle(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags));
+ }
+
+ void PassAway() override {
+ const ui32 replicaNode = Replica.NodeId();
+ if (replicaNode != SelfId().NodeId()) {
+ const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId());
+ Send(interconnectProxy, new TEvents::TEvUnsubscribe());
+ }
+ return IActor::PassAway();
+ }
+
+ void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) {
+ Send(Owner, ev->Release().Release());
+ return PassAway();
+ }
+
+ void HandleUndelivered() {
+ Send(Owner, new TEvExample::TEvReplicaInfo(Key));
+ return PassAway();
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleLookupRequestActor(TActorId owner, TActorId replica, const TString &key)
+ : TActor(&TThis::StateWork)
+ , Owner(owner)
+ , Replica(replica)
+ , Key(key)
+ {}
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExample::TEvReplicaInfo, Handle);
+ cFunc(TEvents::TEvUndelivered::EventType, HandleUndelivered);
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleUndelivered);
+ default:
+ break;
+ }
+ }
+};
+
+class TExampleLookupActor : public TActorBootstrapped<TExampleLookupActor> {
+ TIntrusiveConstPtr<TExampleStorageConfig> Config;
+ const TString Key;
+ const TActorId ReplyTo;
+ TVector<TActorId> RequestActors;
+
+ ui32 TotalReplicas = 0;
+ ui32 RepliedSuccess = 0;
+ ui32 RepliedError = 0;
+
+ TSet<TString> Payloads;
+
+ void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) {
+ NActorsExample::TEvReplicaInfo &record = ev->Get()->Record;
+ if (record.PayloadSize()) {
+ ++RepliedSuccess;
+ for (const TString &payload : record.GetPayload()) {
+ Payloads.insert(payload);
+ }
+ }
+ else {
+ ++RepliedError;
+ }
+
+ const ui32 majority = (TotalReplicas / 2 + 1);
+ if (RepliedSuccess == majority || (RepliedSuccess + RepliedError == TotalReplicas))
+ return ReplyAndDie();
+ }
+
+ void ReplyAndDie() {
+ TVector<TString> replyPayloads(Payloads.begin(), Payloads.end());
+ Send(ReplyTo, new TEvExample::TEvInfo(Key, std::move(replyPayloads)));
+ return PassAway();
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo)
+ : Config(config)
+ , Key(key)
+ , ReplyTo(replyTo)
+ {}
+
+ void Bootstrap() {
+ Y_VERIFY(Config->Replicas.size() > 0);
+
+ TotalReplicas = Config->Replicas.size();
+ RequestActors.reserve(TotalReplicas);
+ for (const auto &replica : Config->Replicas) {
+ const TActorId requestActor = Register(new TExampleLookupRequestActor(SelfId(), replica, Key));
+ RequestActors.emplace_back(requestActor);
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExample::TEvReplicaInfo, Handle);
+ default:
+ break;
+ }
+ }
+};
+
+IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo) {
+ return new TExampleLookupActor(config, key, replyTo);
+}
diff --git a/library/cpp/actors/examples/02_discovery/main.cpp b/library/cpp/actors/examples/02_discovery/main.cpp
new file mode 100644
index 0000000000..379fd6de84
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/main.cpp
@@ -0,0 +1,136 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_common.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/interconnect/poller_tcp.h>
+#include <library/cpp/actors/util/should_continue.h>
+
+#include <util/system/sigset.h>
+#include <util/generic/xrange.h>
+
+using namespace NActors;
+using namespace NActors::NDnsResolver;
+
+static const ui32 CfgTotalReplicaNodes = 5;
+static const ui16 CfgBasePort = 13300;
+static const ui16 CfgHttpPort = 8881;
+static const TString PublishKey = "endpoint";
+
+static TProgramShouldContinue ShouldContinue;
+
+void OnTerminate(int) {
+ ShouldContinue.ShouldStop();
+}
+
+THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 nodeId, ui32 threads, NMonitoring::TDynamicCounters &counters) {
+ Y_VERIFY(threads > 0 && threads < 100);
+
+ auto setup = MakeHolder<TActorSystemSetup>();
+
+ setup->NodeId = nodeId;
+
+ setup->ExecutorsCount = 1;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
+ setup->Executors[0] = new TBasicExecutorPool(0, threads, 50);
+ setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));
+
+ setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0));
+
+ TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup();
+ for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ nameserverTable->StaticNodeTable[xnode] = std::make_pair("127.0.0.1", CfgBasePort + xnode);
+ }
+
+ setup->LocalServices.emplace_back(
+ MakeDnsResolverActorId(),
+ TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0)
+ );
+
+ setup->LocalServices.emplace_back(
+ GetNameserviceActorId(),
+ TActorSetupCmd(CreateNameserverTable(nameserverTable), TMailboxType::ReadAsFilled, 0)
+ );
+
+ TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon();
+ icCommon->NameserviceId = GetNameserviceActorId();
+ icCommon->MonCounters = counters.GetSubgroup("counters", "interconnect");
+ icCommon->TechnicalSelfHostName = "127.0.0.1";
+
+ setup->Interconnect.ProxyActors.resize(CfgTotalReplicaNodes + 1);
+ for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ if (xnode != nodeId) {
+ IActor *actor = new TInterconnectProxyTCP(xnode, icCommon);
+ setup->Interconnect.ProxyActors[xnode] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0);
+ }
+ else {
+ IActor *listener = new TInterconnectListenerTCP("127.0.0.1", CfgBasePort + xnode, icCommon);
+ setup->LocalServices.emplace_back(
+ MakeInterconnectListenerActorId(false),
+ TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0)
+ );
+ }
+ }
+
+ return setup;
+}
+
+int main(int argc, char **argv) {
+ Y_UNUSED(argc);
+ Y_UNUSED(argv);
+
+#ifdef _unix_
+ signal(SIGPIPE, SIG_IGN);
+#endif
+ signal(SIGINT, &OnTerminate);
+ signal(SIGTERM, &OnTerminate);
+
+ TIntrusivePtr<TExampleStorageConfig> config = new TExampleStorageConfig();
+ for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ config->Replicas.push_back(MakeReplicaId(nodeid));
+ }
+
+ TVector<THolder<TActorSystem>> actorSystemHolder;
+ TVector<TIntrusivePtr<NMonitoring::TDynamicCounters>> countersHolder;
+ for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ countersHolder.emplace_back(new NMonitoring::TDynamicCounters());
+ THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(nodeid, 2, *countersHolder.back());
+ actorSystemSetup->LocalServices.emplace_back(
+ TActorId(),
+ TActorSetupCmd(CreateEndpointActor(config.Get(), PublishKey, CfgHttpPort + nodeid), TMailboxType::HTSwap, 0)
+ );
+
+ actorSystemSetup->LocalServices.emplace_back(
+ MakeReplicaId(nodeid),
+ TActorSetupCmd(CreateReplica(), TMailboxType::ReadAsFilled, 0)
+ );
+
+ actorSystemHolder.emplace_back(new TActorSystem(actorSystemSetup));
+ }
+
+ for (auto &xh : actorSystemHolder)
+ xh->Start();
+
+ while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
+ Sleep(TDuration::MilliSeconds(200));
+ }
+
+ // stop actorsystem to not generate new reqeusts for external services
+ // no events would be processed anymore
+ for (auto &xh : actorSystemHolder)
+ xh->Stop();
+
+ // and then cleanup actorsystem
+ // from this moment working with actorsystem prohibited
+ for (auto &xh : actorSystemHolder)
+ xh->Cleanup();
+
+ return ShouldContinue.GetReturnCode();
+}
diff --git a/library/cpp/actors/examples/02_discovery/protocol.proto b/library/cpp/actors/examples/02_discovery/protocol.proto
new file mode 100644
index 0000000000..41cc2cc9c8
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/protocol.proto
@@ -0,0 +1,19 @@
+package NActorsExample;
+
+message TEvReplicaInfo {
+ optional string Key = 1;
+ repeated string Payload = 2;
+};
+
+message TEvReplicaPublishAck {
+ optional string Key = 1;
+};
+
+message TEvReplicaPublish {
+ optional string Key = 1;
+ optional string Payload = 2;
+};
+
+message TEvReplicaLookup {
+ optional string Key = 1;
+};
diff --git a/library/cpp/actors/examples/02_discovery/publish.cpp b/library/cpp/actors/examples/02_discovery/publish.cpp
new file mode 100644
index 0000000000..8dc5fbcea4
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/publish.cpp
@@ -0,0 +1,113 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <util/generic/set.h>
+#include <util/generic/vector.h>
+
+class TExamplePublishReplicaActor : public TActorBootstrapped<TExamplePublishReplicaActor> {
+ const TActorId Owner;
+ const TActorId Replica;
+ const TString Key;
+ const TString Payload;
+
+ void PassAway() override {
+ const ui32 replicaNode = Replica.NodeId();
+ if (replicaNode != SelfId().NodeId()) {
+ const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId());
+ Send(interconnectProxy, new TEvents::TEvUnsubscribe());
+ }
+ return IActor::PassAway();
+ }
+
+ void SomeSleep() {
+ Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup());
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload)
+ : Owner(owner)
+ , Replica(replica)
+ , Key(key)
+ , Payload(payload)
+ {}
+
+ void Bootstrap() {
+ const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession;
+ Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags);
+ Become(&TThis::StatePublish);
+ }
+
+ STFUNC(StatePublish) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ cFunc(TEvents::TEvUndelivered::EventType, SomeSleep);
+ cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep);
+ default:
+ break;
+ }
+ }
+
+ STFUNC(StateSleep) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ cFunc(TEvents::TEvWakeup::EventType, Bootstrap);
+ default:
+ break;
+ }
+ }
+};
+
+class TExamplePublishActor : public TActorBootstrapped<TExamplePublishActor> {
+ TIntrusiveConstPtr<TExampleStorageConfig> Config;
+ const TString Key;
+ const TString Payload;
+ TVector<TActorId> PublishActors;
+
+ void PassAway() override {
+ for (const auto &x : PublishActors)
+ Send(x, new TEvents::TEvPoison());
+ return IActor::PassAway();
+ }
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what)
+ : Config(config)
+ , Key(key)
+ , Payload(what)
+ {}
+
+ void Bootstrap() {
+ for (auto &replica : Config->Replicas) {
+ const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload));
+ PublishActors.emplace_back(x);
+ }
+
+ Become(&TThis::StateWork);
+ }
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ cFunc(TEvents::TEvPoison::EventType, PassAway);
+ default:
+ break;
+ }
+ }
+};
+
+IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) {
+ return new TExamplePublishActor(config, key, what);
+}
diff --git a/library/cpp/actors/examples/02_discovery/replica.cpp b/library/cpp/actors/examples/02_discovery/replica.cpp
new file mode 100644
index 0000000000..74fdfc1910
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/replica.cpp
@@ -0,0 +1,182 @@
+#include "services.h"
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/interconnect.h>
+#include <util/generic/set.h>
+#include <util/generic/hash_set.h>
+#include <util/generic/vector.h>
+
+class TExampleReplicaActor : public TActor<TExampleReplicaActor> {
+ using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>;
+ using TKeyIndex = THashMap<TString, TSet<ui32>>;
+
+ struct TEntry {
+ TString Payload;
+ TActorId Owner;
+ TOwnerIndex::iterator OwnerIt;
+ TKeyIndex::iterator KeyIt;
+ };
+
+ TVector<TEntry> Entries;
+ TVector<ui32> AvailableEntries;
+
+ TOwnerIndex IndexOwner;
+ TKeyIndex IndexKey;
+
+ ui32 AllocateEntry() {
+ ui32 ret;
+ if (AvailableEntries) {
+ ret = AvailableEntries.back();
+ AvailableEntries.pop_back();
+ }
+ else {
+ ret = Entries.size();
+ Entries.emplace_back();
+ }
+
+ return ret;
+ }
+
+ bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) {
+ const ui32 ownerNodeId = ownerIt->first.NodeId();
+ if (ownerIt != IndexOwner.begin()) {
+ auto x = ownerIt;
+ --x;
+ if (x->first.NodeId() == ownerNodeId)
+ return false;
+ }
+
+ ++ownerIt;
+ if (ownerIt != IndexOwner.end()) {
+ if (ownerIt->first.NodeId() == ownerNodeId)
+ return false;
+ }
+
+ return true;
+ }
+
+ void CleanupEntry(ui32 entryIndex) {
+ TEntry &entry = Entries[entryIndex];
+ entry.KeyIt->second.erase(entryIndex);
+ if (entry.KeyIt->second.empty())
+ IndexKey.erase(entry.KeyIt);
+
+ if (IsLastEntryOnNode(entry.OwnerIt))
+ Send(TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe());
+
+ IndexOwner.erase(entry.OwnerIt);
+
+ TString().swap(entry.Payload);
+ entry.Owner = TActorId();
+ entry.KeyIt = IndexKey.end();
+ entry.OwnerIt = IndexOwner.end();
+
+ AvailableEntries.emplace_back(entryIndex);
+ }
+
+ void Handle(TEvExample::TEvReplicaLookup::TPtr &ev) {
+ auto &record = ev->Get()->Record;
+ const auto &key = record.GetKey();
+
+ auto keyIt = IndexKey.find(key);
+ if (keyIt == IndexKey.end()) {
+ Send(ev->Sender, new TEvExample::TEvReplicaInfo(key), 0, ev->Cookie);
+ return;
+ }
+
+ auto reply = MakeHolder<TEvExample::TEvReplicaInfo>(key);
+ reply->Record.MutablePayload()->Reserve(keyIt->second.size());
+ for (ui32 entryIndex : keyIt->second) {
+ const TEntry &entry = Entries[entryIndex];
+ reply->Record.AddPayload(entry.Payload);
+ }
+
+ Send(ev->Sender, std::move(reply), 0, ev->Cookie);
+ }
+
+ void Handle(TEvExample::TEvReplicaPublish::TPtr &ev) {
+ auto &record = ev->Get()->Record;
+ const TString &key = record.GetKey();
+ const TString &payload = record.GetPayload();
+ const TActorId &owner = ev->Sender;
+
+ auto ownerIt = IndexOwner.find(owner);
+ if (ownerIt != IndexOwner.end()) {
+ const ui32 entryIndex = ownerIt->second;
+ TEntry &entry = Entries[entryIndex];
+ if (entry.KeyIt->first != key) {
+ // reply nothing, request suspicious
+ return;
+ }
+
+ entry.Payload = payload;
+ }
+ else {
+ const ui32 entryIndex = AllocateEntry();
+ TEntry &entry = Entries[entryIndex];
+
+ entry.Payload = payload;
+ entry.Owner = owner;
+
+ entry.OwnerIt = IndexOwner.emplace(owner, entryIndex).first;
+ entry.KeyIt = IndexKey.emplace(std::make_pair(key, TSet<ui32>())).first;
+ entry.KeyIt->second.emplace(entryIndex);
+
+ Send(owner, new TEvExample::TEvReplicaPublishAck(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie);
+ }
+ }
+
+ void Handle(TEvents::TEvUndelivered::TPtr &ev) {
+ auto ownerIt = IndexOwner.find(ev->Sender);
+ if (ownerIt == IndexOwner.end())
+ return;
+
+ CleanupEntry(ownerIt->second);
+ }
+
+ void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) {
+ auto *msg = ev->Get();
+ const ui32 nodeId = msg->NodeId;
+ auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0));
+ while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) {
+ const ui32 idx = ownerIt->second;
+ ++ownerIt;
+ CleanupEntry(idx);
+ }
+ }
+
+public:
+ static constexpr IActor::EActivityType ActorActivityType() {
+ // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon
+ return EActorActivity::ACTORLIB_COMMON;
+ }
+
+ TExampleReplicaActor()
+ : TActor(&TThis::StateWork)
+ {}
+
+ STFUNC(StateWork) {
+ Y_UNUSED(ctx);
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvExample::TEvReplicaLookup, Handle);
+ hFunc(TEvExample::TEvReplicaPublish, Handle);
+ hFunc(TEvents::TEvUndelivered, Handle);
+ hFunc(TEvInterconnect::TEvNodeDisconnected, Handle);
+
+ IgnoreFunc(TEvInterconnect::TEvNodeConnected);
+ default:
+ // here is place to spam some log message on unknown events
+ break;
+ }
+ }
+};
+
+IActor* CreateReplica() {
+ return new TExampleReplicaActor();
+}
+
+TActorId MakeReplicaId(ui32 nodeid) {
+ char x[12] = { 'r', 'p', 'l' };
+ memcpy(x + 5, &nodeid, sizeof(ui32));
+ return TActorId(nodeid, TStringBuf(x, 12));
+}
diff --git a/library/cpp/actors/examples/02_discovery/services.h b/library/cpp/actors/examples/02_discovery/services.h
new file mode 100644
index 0000000000..266517c577
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/services.h
@@ -0,0 +1,85 @@
+#pragma once
+#include <library/cpp/actors/examples/02_discovery/protocol.pb.h>
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/event_pb.h>
+#include <library/cpp/actors/core/event_local.h>
+
+#include <util/generic/vector.h>
+
+using namespace NActors;
+
+struct TExampleStorageConfig : public TThrRefBase {
+ TVector<TActorId> Replicas;
+};
+
+struct TEvExample {
+ enum EEv {
+ EvReplicaLookup = EventSpaceBegin(TEvents::ES_USERSPACE + 1),
+ EvReplicaPublish,
+
+ EvReplicaInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 2),
+ EvReplicaPublishAck,
+
+ EvInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 3),
+ };
+
+ struct TEvReplicaLookup : public TEventPB<TEvReplicaLookup, NActorsExample::TEvReplicaLookup, EvReplicaLookup> {
+ TEvReplicaLookup()
+ {}
+
+ TEvReplicaLookup(const TString &key)
+ {
+ Record.SetKey(key);
+ }
+ };
+
+ struct TEvReplicaPublish : public TEventPB<TEvReplicaPublish, NActorsExample::TEvReplicaPublish, EvReplicaPublish> {
+ TEvReplicaPublish()
+ {}
+
+ TEvReplicaPublish(const TString &key, const TString &payload)
+ {
+ Record.SetKey(key);
+ Record.SetPayload(payload);
+ }
+ };
+
+ struct TEvReplicaInfo : public TEventPB<TEvReplicaInfo, NActorsExample::TEvReplicaInfo, EvReplicaInfo> {
+ TEvReplicaInfo()
+ {}
+
+ TEvReplicaInfo(const TString &key)
+ {
+ Record.SetKey(key);
+ }
+ };
+
+ struct TEvReplicaPublishAck : public TEventPB<TEvReplicaPublishAck, NActorsExample::TEvReplicaPublishAck, EvReplicaPublishAck> {
+ TEvReplicaPublishAck()
+ {}
+
+ TEvReplicaPublishAck(const TString &key)
+ {
+ Record.SetKey(key);
+ }
+ };
+
+ struct TEvInfo : public TEventLocal<TEvInfo, EvInfo> {
+ const TString Key;
+ const TVector<TString> Payloads;
+
+ TEvInfo(const TString &key, TVector<TString> &&payloads)
+ : Key(key)
+ , Payloads(payloads)
+ {}
+ };
+};
+
+IActor* CreateReplica();
+IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what);
+IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo);
+IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 httpPort);
+
+TActorId MakeReplicaId(ui32 nodeid);