aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/actors/examples/02_discovery/main.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:26:46 +0300
committerDaniil Cherednik <dan.cherednik@gmail.com>2023-06-02 18:32:58 +0300
commit7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch)
treed25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery/main.cpp
parent10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff)
downloadydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery/main.cpp')
-rw-r--r--library/cpp/actors/examples/02_discovery/main.cpp136
1 files changed, 136 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/main.cpp b/library/cpp/actors/examples/02_discovery/main.cpp
new file mode 100644
index 0000000000..379fd6de84
--- /dev/null
+++ b/library/cpp/actors/examples/02_discovery/main.cpp
@@ -0,0 +1,136 @@
+#include "services.h"
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/actors/core/executor_pool_basic.h>
+#include <library/cpp/actors/core/scheduler_basic.h>
+#include <library/cpp/actors/core/log.h>
+#include <library/cpp/actors/dnsresolver/dnsresolver.h>
+#include <library/cpp/actors/interconnect/interconnect.h>
+#include <library/cpp/actors/interconnect/interconnect_common.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h>
+#include <library/cpp/actors/interconnect/interconnect_tcp_server.h>
+#include <library/cpp/actors/interconnect/poller_actor.h>
+#include <library/cpp/actors/interconnect/poller_tcp.h>
+#include <library/cpp/actors/util/should_continue.h>
+
+#include <util/system/sigset.h>
+#include <util/generic/xrange.h>
+
+using namespace NActors;
+using namespace NActors::NDnsResolver;
+
+static const ui32 CfgTotalReplicaNodes = 5;
+static const ui16 CfgBasePort = 13300;
+static const ui16 CfgHttpPort = 8881;
+static const TString PublishKey = "endpoint";
+
+static TProgramShouldContinue ShouldContinue;
+
+void OnTerminate(int) {
+ ShouldContinue.ShouldStop();
+}
+
+THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 nodeId, ui32 threads, NMonitoring::TDynamicCounters &counters) {
+ Y_VERIFY(threads > 0 && threads < 100);
+
+ auto setup = MakeHolder<TActorSystemSetup>();
+
+ setup->NodeId = nodeId;
+
+ setup->ExecutorsCount = 1;
+ setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]);
+ setup->Executors[0] = new TBasicExecutorPool(0, threads, 50);
+ setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0));
+
+ setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0));
+
+ TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup();
+ for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ nameserverTable->StaticNodeTable[xnode] = std::make_pair("127.0.0.1", CfgBasePort + xnode);
+ }
+
+ setup->LocalServices.emplace_back(
+ MakeDnsResolverActorId(),
+ TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0)
+ );
+
+ setup->LocalServices.emplace_back(
+ GetNameserviceActorId(),
+ TActorSetupCmd(CreateNameserverTable(nameserverTable), TMailboxType::ReadAsFilled, 0)
+ );
+
+ TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon();
+ icCommon->NameserviceId = GetNameserviceActorId();
+ icCommon->MonCounters = counters.GetSubgroup("counters", "interconnect");
+ icCommon->TechnicalSelfHostName = "127.0.0.1";
+
+ setup->Interconnect.ProxyActors.resize(CfgTotalReplicaNodes + 1);
+ for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ if (xnode != nodeId) {
+ IActor *actor = new TInterconnectProxyTCP(xnode, icCommon);
+ setup->Interconnect.ProxyActors[xnode] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0);
+ }
+ else {
+ IActor *listener = new TInterconnectListenerTCP("127.0.0.1", CfgBasePort + xnode, icCommon);
+ setup->LocalServices.emplace_back(
+ MakeInterconnectListenerActorId(false),
+ TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0)
+ );
+ }
+ }
+
+ return setup;
+}
+
+int main(int argc, char **argv) {
+ Y_UNUSED(argc);
+ Y_UNUSED(argv);
+
+#ifdef _unix_
+ signal(SIGPIPE, SIG_IGN);
+#endif
+ signal(SIGINT, &OnTerminate);
+ signal(SIGTERM, &OnTerminate);
+
+ TIntrusivePtr<TExampleStorageConfig> config = new TExampleStorageConfig();
+ for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ config->Replicas.push_back(MakeReplicaId(nodeid));
+ }
+
+ TVector<THolder<TActorSystem>> actorSystemHolder;
+ TVector<TIntrusivePtr<NMonitoring::TDynamicCounters>> countersHolder;
+ for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) {
+ countersHolder.emplace_back(new NMonitoring::TDynamicCounters());
+ THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(nodeid, 2, *countersHolder.back());
+ actorSystemSetup->LocalServices.emplace_back(
+ TActorId(),
+ TActorSetupCmd(CreateEndpointActor(config.Get(), PublishKey, CfgHttpPort + nodeid), TMailboxType::HTSwap, 0)
+ );
+
+ actorSystemSetup->LocalServices.emplace_back(
+ MakeReplicaId(nodeid),
+ TActorSetupCmd(CreateReplica(), TMailboxType::ReadAsFilled, 0)
+ );
+
+ actorSystemHolder.emplace_back(new TActorSystem(actorSystemSetup));
+ }
+
+ for (auto &xh : actorSystemHolder)
+ xh->Start();
+
+ while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) {
+ Sleep(TDuration::MilliSeconds(200));
+ }
+
+ // stop actorsystem to not generate new reqeusts for external services
+ // no events would be processed anymore
+ for (auto &xh : actorSystemHolder)
+ xh->Stop();
+
+ // and then cleanup actorsystem
+ // from this moment working with actorsystem prohibited
+ for (auto &xh : actorSystemHolder)
+ xh->Cleanup();
+
+ return ShouldContinue.GetReturnCode();
+}