diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:26:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:32:58 +0300 |
commit | 7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch) | |
tree | d25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery/main.cpp | |
parent | 10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff) | |
download | ydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz |
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery/main.cpp')
-rw-r--r-- | library/cpp/actors/examples/02_discovery/main.cpp | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/main.cpp b/library/cpp/actors/examples/02_discovery/main.cpp new file mode 100644 index 0000000000..379fd6de84 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/main.cpp @@ -0,0 +1,136 @@ +#include "services.h" + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_common.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/interconnect/poller_tcp.h> +#include <library/cpp/actors/util/should_continue.h> + +#include <util/system/sigset.h> +#include <util/generic/xrange.h> + +using namespace NActors; +using namespace NActors::NDnsResolver; + +static const ui32 CfgTotalReplicaNodes = 5; +static const ui16 CfgBasePort = 13300; +static const ui16 CfgHttpPort = 8881; +static const TString PublishKey = "endpoint"; + +static TProgramShouldContinue ShouldContinue; + +void OnTerminate(int) { + ShouldContinue.ShouldStop(); +} + +THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 nodeId, ui32 threads, NMonitoring::TDynamicCounters &counters) { + Y_VERIFY(threads > 0 && threads < 100); + + auto setup = MakeHolder<TActorSystemSetup>(); + + setup->NodeId = nodeId; + + setup->ExecutorsCount = 1; + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); + setup->Executors[0] = new TBasicExecutorPool(0, threads, 50); + setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); + + setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); + + TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup(); + for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + nameserverTable->StaticNodeTable[xnode] = std::make_pair("127.0.0.1", CfgBasePort + xnode); + } + + setup->LocalServices.emplace_back( + MakeDnsResolverActorId(), + TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0) + ); + + setup->LocalServices.emplace_back( + GetNameserviceActorId(), + TActorSetupCmd(CreateNameserverTable(nameserverTable), TMailboxType::ReadAsFilled, 0) + ); + + TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon(); + icCommon->NameserviceId = GetNameserviceActorId(); + icCommon->MonCounters = counters.GetSubgroup("counters", "interconnect"); + icCommon->TechnicalSelfHostName = "127.0.0.1"; + + setup->Interconnect.ProxyActors.resize(CfgTotalReplicaNodes + 1); + for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + if (xnode != nodeId) { + IActor *actor = new TInterconnectProxyTCP(xnode, icCommon); + setup->Interconnect.ProxyActors[xnode] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); + } + else { + IActor *listener = new TInterconnectListenerTCP("127.0.0.1", CfgBasePort + xnode, icCommon); + setup->LocalServices.emplace_back( + MakeInterconnectListenerActorId(false), + TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0) + ); + } + } + + return setup; +} + +int main(int argc, char **argv) { + Y_UNUSED(argc); + Y_UNUSED(argv); + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + TIntrusivePtr<TExampleStorageConfig> config = new TExampleStorageConfig(); + for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + config->Replicas.push_back(MakeReplicaId(nodeid)); + } + + TVector<THolder<TActorSystem>> actorSystemHolder; + TVector<TIntrusivePtr<NMonitoring::TDynamicCounters>> countersHolder; + for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + countersHolder.emplace_back(new NMonitoring::TDynamicCounters()); + THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(nodeid, 2, *countersHolder.back()); + actorSystemSetup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(CreateEndpointActor(config.Get(), PublishKey, CfgHttpPort + nodeid), TMailboxType::HTSwap, 0) + ); + + actorSystemSetup->LocalServices.emplace_back( + MakeReplicaId(nodeid), + TActorSetupCmd(CreateReplica(), TMailboxType::ReadAsFilled, 0) + ); + + actorSystemHolder.emplace_back(new TActorSystem(actorSystemSetup)); + } + + for (auto &xh : actorSystemHolder) + xh->Start(); + + while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { + Sleep(TDuration::MilliSeconds(200)); + } + + // stop actorsystem to not generate new reqeusts for external services + // no events would be processed anymore + for (auto &xh : actorSystemHolder) + xh->Stop(); + + // and then cleanup actorsystem + // from this moment working with actorsystem prohibited + for (auto &xh : actorSystemHolder) + xh->Cleanup(); + + return ShouldContinue.GetReturnCode(); +} |