diff options
author | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:26:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dan.cherednik@gmail.com> | 2023-06-02 18:32:58 +0300 |
commit | 7e7de263d4acbc6eacf92b618bcb5f9049bccc9b (patch) | |
tree | d25ff63925cbcc5e0f80fe4d1514b4cb48a9f686 /library/cpp/actors/examples/02_discovery | |
parent | 10ba5cc0c3d130ce4b33d307d265b937dd572c39 (diff) | |
download | ydb-7e7de263d4acbc6eacf92b618bcb5f9049bccc9b.tar.gz |
add library/cpp/actors to github export
x-stable-origin-commit: 0e951cfb44430a0ed33bec779c8a790f73c31b91
Diffstat (limited to 'library/cpp/actors/examples/02_discovery')
11 files changed, 966 insertions, 0 deletions
diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt new file mode 100644 index 0000000000..2eaa6534d8 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin.txt @@ -0,0 +1,53 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..1883d9ddad --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt @@ -0,0 +1,55 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt new file mode 100644 index 0000000000..855362b4ab --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux.txt @@ -0,0 +1,56 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.txt new file mode 100644 index 0000000000..3e0811fb22 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.txt @@ -0,0 +1,15 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE) + include(CMakeLists.darwin.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux.txt) +endif() diff --git a/library/cpp/actors/examples/02_discovery/endpoint.cpp b/library/cpp/actors/examples/02_discovery/endpoint.cpp new file mode 100644 index 0000000000..97780e8b4c --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/endpoint.cpp @@ -0,0 +1,118 @@ +#include "services.h" + +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/protos/services_common.pb.h> + +#include <library/cpp/actors/http/http.h> +#include <library/cpp/actors/http/http_proxy.h> + +#include <util/system/hostname.h> +#include <util/string/builder.h> + +class TExampleHttpRequest : public TActor<TExampleHttpRequest> { + TIntrusivePtr<TExampleStorageConfig> Config; + const TString PublishKey; + + TActorId HttpProxy; + NHttp::THttpIncomingRequestPtr Request; + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) { + Request = std::move(ev->Get()->Request); + HttpProxy = ev->Sender; + + Register(CreateLookupActor(Config.Get(), PublishKey, SelfId())); + } + + void Handle(TEvExample::TEvInfo::TPtr &ev) { + auto *msg = ev->Get(); + + TStringBuilder body; + for (const auto &x : msg->Payloads) + body << x << Endl; + + auto response = Request->CreateResponseOK(body, "application/text; charset=utf-8"); + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + + PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleHttpRequest(TExampleStorageConfig *config, const TString &publishKey) + : TActor(&TThis::StateWork) + , Config(config) + , PublishKey(publishKey) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + hFunc(TEvExample::TEvInfo, Handle); + } + } +}; + +class TExampleHttpEndpoint : public TActorBootstrapped<TExampleHttpEndpoint> { + TIntrusivePtr<TExampleStorageConfig> Config; + const TString PublishKey; + const ui16 HttpPort; + + TActorId PublishActor; + TActorId HttpProxy; + + std::shared_ptr<NMonitoring::TMetricRegistry> SensorsRegistry = std::make_shared<NMonitoring::TMetricRegistry>(); + + void PassAway() override { + Send(PublishActor, new TEvents::TEvPoison()); + Send(HttpProxy, new TEvents::TEvPoison()); + + return TActor::PassAway(); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) { + const TActorId reqActor = Register(new TExampleHttpRequest(Config.Get(), PublishKey)); + TlsActivationContext->Send(ev->Forward(reqActor)); + } + +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleHttpEndpoint(TExampleStorageConfig *config, const TString &publishKey, ui16 port) + : Config(config) + , PublishKey(publishKey) + , HttpPort(port) + { + } + + void Bootstrap() { + const TString publishPayload = ToString(HttpPort); + PublishActor = Register(CreatePublishActor(Config.Get(), PublishKey, publishPayload)); + HttpProxy = Register(NHttp::CreateHttpProxy(SensorsRegistry)); + + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvAddListeningPort(HttpPort, FQDNHostName())); + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvRegisterHandler("/list", SelfId())); + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + default: + break; + } + } +}; + +IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 port) { + return new TExampleHttpEndpoint(config, publishKey, port); +} diff --git a/library/cpp/actors/examples/02_discovery/lookup.cpp b/library/cpp/actors/examples/02_discovery/lookup.cpp new file mode 100644 index 0000000000..979a38d215 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/lookup.cpp @@ -0,0 +1,134 @@ +#include "services.h" + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> + +class TExampleLookupRequestActor : public TActor<TExampleLookupRequestActor> { + const TActorId Owner; + const TActorId Replica; + const TString Key; + + void Registered(TActorSystem* sys, const TActorId&) override { + const auto flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession; + sys->Send(new IEventHandle(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags)); + } + + void PassAway() override { + const ui32 replicaNode = Replica.NodeId(); + if (replicaNode != SelfId().NodeId()) { + const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId()); + Send(interconnectProxy, new TEvents::TEvUnsubscribe()); + } + return IActor::PassAway(); + } + + void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) { + Send(Owner, ev->Release().Release()); + return PassAway(); + } + + void HandleUndelivered() { + Send(Owner, new TEvExample::TEvReplicaInfo(Key)); + return PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleLookupRequestActor(TActorId owner, TActorId replica, const TString &key) + : TActor(&TThis::StateWork) + , Owner(owner) + , Replica(replica) + , Key(key) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaInfo, Handle); + cFunc(TEvents::TEvUndelivered::EventType, HandleUndelivered); + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleUndelivered); + default: + break; + } + } +}; + +class TExampleLookupActor : public TActorBootstrapped<TExampleLookupActor> { + TIntrusiveConstPtr<TExampleStorageConfig> Config; + const TString Key; + const TActorId ReplyTo; + TVector<TActorId> RequestActors; + + ui32 TotalReplicas = 0; + ui32 RepliedSuccess = 0; + ui32 RepliedError = 0; + + TSet<TString> Payloads; + + void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) { + NActorsExample::TEvReplicaInfo &record = ev->Get()->Record; + if (record.PayloadSize()) { + ++RepliedSuccess; + for (const TString &payload : record.GetPayload()) { + Payloads.insert(payload); + } + } + else { + ++RepliedError; + } + + const ui32 majority = (TotalReplicas / 2 + 1); + if (RepliedSuccess == majority || (RepliedSuccess + RepliedError == TotalReplicas)) + return ReplyAndDie(); + } + + void ReplyAndDie() { + TVector<TString> replyPayloads(Payloads.begin(), Payloads.end()); + Send(ReplyTo, new TEvExample::TEvInfo(Key, std::move(replyPayloads))); + return PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo) + : Config(config) + , Key(key) + , ReplyTo(replyTo) + {} + + void Bootstrap() { + Y_VERIFY(Config->Replicas.size() > 0); + + TotalReplicas = Config->Replicas.size(); + RequestActors.reserve(TotalReplicas); + for (const auto &replica : Config->Replicas) { + const TActorId requestActor = Register(new TExampleLookupRequestActor(SelfId(), replica, Key)); + RequestActors.emplace_back(requestActor); + } + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaInfo, Handle); + default: + break; + } + } +}; + +IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo) { + return new TExampleLookupActor(config, key, replyTo); +} diff --git a/library/cpp/actors/examples/02_discovery/main.cpp b/library/cpp/actors/examples/02_discovery/main.cpp new file mode 100644 index 0000000000..379fd6de84 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/main.cpp @@ -0,0 +1,136 @@ +#include "services.h" + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/executor_pool_basic.h> +#include <library/cpp/actors/core/scheduler_basic.h> +#include <library/cpp/actors/core/log.h> +#include <library/cpp/actors/dnsresolver/dnsresolver.h> +#include <library/cpp/actors/interconnect/interconnect.h> +#include <library/cpp/actors/interconnect/interconnect_common.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_proxy.h> +#include <library/cpp/actors/interconnect/interconnect_tcp_server.h> +#include <library/cpp/actors/interconnect/poller_actor.h> +#include <library/cpp/actors/interconnect/poller_tcp.h> +#include <library/cpp/actors/util/should_continue.h> + +#include <util/system/sigset.h> +#include <util/generic/xrange.h> + +using namespace NActors; +using namespace NActors::NDnsResolver; + +static const ui32 CfgTotalReplicaNodes = 5; +static const ui16 CfgBasePort = 13300; +static const ui16 CfgHttpPort = 8881; +static const TString PublishKey = "endpoint"; + +static TProgramShouldContinue ShouldContinue; + +void OnTerminate(int) { + ShouldContinue.ShouldStop(); +} + +THolder<TActorSystemSetup> BuildActorSystemSetup(ui32 nodeId, ui32 threads, NMonitoring::TDynamicCounters &counters) { + Y_VERIFY(threads > 0 && threads < 100); + + auto setup = MakeHolder<TActorSystemSetup>(); + + setup->NodeId = nodeId; + + setup->ExecutorsCount = 1; + setup->Executors.Reset(new TAutoPtr<IExecutorPool>[1]); + setup->Executors[0] = new TBasicExecutorPool(0, threads, 50); + setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); + + setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); + + TIntrusivePtr<TTableNameserverSetup> nameserverTable = new TTableNameserverSetup(); + for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + nameserverTable->StaticNodeTable[xnode] = std::make_pair("127.0.0.1", CfgBasePort + xnode); + } + + setup->LocalServices.emplace_back( + MakeDnsResolverActorId(), + TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0) + ); + + setup->LocalServices.emplace_back( + GetNameserviceActorId(), + TActorSetupCmd(CreateNameserverTable(nameserverTable), TMailboxType::ReadAsFilled, 0) + ); + + TIntrusivePtr<TInterconnectProxyCommon> icCommon = new TInterconnectProxyCommon(); + icCommon->NameserviceId = GetNameserviceActorId(); + icCommon->MonCounters = counters.GetSubgroup("counters", "interconnect"); + icCommon->TechnicalSelfHostName = "127.0.0.1"; + + setup->Interconnect.ProxyActors.resize(CfgTotalReplicaNodes + 1); + for (ui32 xnode : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + if (xnode != nodeId) { + IActor *actor = new TInterconnectProxyTCP(xnode, icCommon); + setup->Interconnect.ProxyActors[xnode] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); + } + else { + IActor *listener = new TInterconnectListenerTCP("127.0.0.1", CfgBasePort + xnode, icCommon); + setup->LocalServices.emplace_back( + MakeInterconnectListenerActorId(false), + TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0) + ); + } + } + + return setup; +} + +int main(int argc, char **argv) { + Y_UNUSED(argc); + Y_UNUSED(argv); + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + TIntrusivePtr<TExampleStorageConfig> config = new TExampleStorageConfig(); + for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + config->Replicas.push_back(MakeReplicaId(nodeid)); + } + + TVector<THolder<TActorSystem>> actorSystemHolder; + TVector<TIntrusivePtr<NMonitoring::TDynamicCounters>> countersHolder; + for (ui32 nodeid : xrange<ui32>(1, CfgTotalReplicaNodes + 1)) { + countersHolder.emplace_back(new NMonitoring::TDynamicCounters()); + THolder<TActorSystemSetup> actorSystemSetup = BuildActorSystemSetup(nodeid, 2, *countersHolder.back()); + actorSystemSetup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(CreateEndpointActor(config.Get(), PublishKey, CfgHttpPort + nodeid), TMailboxType::HTSwap, 0) + ); + + actorSystemSetup->LocalServices.emplace_back( + MakeReplicaId(nodeid), + TActorSetupCmd(CreateReplica(), TMailboxType::ReadAsFilled, 0) + ); + + actorSystemHolder.emplace_back(new TActorSystem(actorSystemSetup)); + } + + for (auto &xh : actorSystemHolder) + xh->Start(); + + while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { + Sleep(TDuration::MilliSeconds(200)); + } + + // stop actorsystem to not generate new reqeusts for external services + // no events would be processed anymore + for (auto &xh : actorSystemHolder) + xh->Stop(); + + // and then cleanup actorsystem + // from this moment working with actorsystem prohibited + for (auto &xh : actorSystemHolder) + xh->Cleanup(); + + return ShouldContinue.GetReturnCode(); +} diff --git a/library/cpp/actors/examples/02_discovery/protocol.proto b/library/cpp/actors/examples/02_discovery/protocol.proto new file mode 100644 index 0000000000..41cc2cc9c8 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/protocol.proto @@ -0,0 +1,19 @@ +package NActorsExample; + +message TEvReplicaInfo { + optional string Key = 1; + repeated string Payload = 2; +}; + +message TEvReplicaPublishAck { + optional string Key = 1; +}; + +message TEvReplicaPublish { + optional string Key = 1; + optional string Payload = 2; +}; + +message TEvReplicaLookup { + optional string Key = 1; +}; diff --git a/library/cpp/actors/examples/02_discovery/publish.cpp b/library/cpp/actors/examples/02_discovery/publish.cpp new file mode 100644 index 0000000000..8dc5fbcea4 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/publish.cpp @@ -0,0 +1,113 @@ +#include "services.h" + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <util/generic/set.h> +#include <util/generic/vector.h> + +class TExamplePublishReplicaActor : public TActorBootstrapped<TExamplePublishReplicaActor> { + const TActorId Owner; + const TActorId Replica; + const TString Key; + const TString Payload; + + void PassAway() override { + const ui32 replicaNode = Replica.NodeId(); + if (replicaNode != SelfId().NodeId()) { + const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId()); + Send(interconnectProxy, new TEvents::TEvUnsubscribe()); + } + return IActor::PassAway(); + } + + void SomeSleep() { + Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup()); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload) + : Owner(owner) + , Replica(replica) + , Key(key) + , Payload(payload) + {} + + void Bootstrap() { + const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession; + Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags); + Become(&TThis::StatePublish); + } + + STFUNC(StatePublish) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + cFunc(TEvents::TEvUndelivered::EventType, SomeSleep); + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep); + default: + break; + } + } + + STFUNC(StateSleep) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + cFunc(TEvents::TEvWakeup::EventType, Bootstrap); + default: + break; + } + } +}; + +class TExamplePublishActor : public TActorBootstrapped<TExamplePublishActor> { + TIntrusiveConstPtr<TExampleStorageConfig> Config; + const TString Key; + const TString Payload; + TVector<TActorId> PublishActors; + + void PassAway() override { + for (const auto &x : PublishActors) + Send(x, new TEvents::TEvPoison()); + return IActor::PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) + : Config(config) + , Key(key) + , Payload(what) + {} + + void Bootstrap() { + for (auto &replica : Config->Replicas) { + const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload)); + PublishActors.emplace_back(x); + } + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + break; + } + } +}; + +IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) { + return new TExamplePublishActor(config, key, what); +} diff --git a/library/cpp/actors/examples/02_discovery/replica.cpp b/library/cpp/actors/examples/02_discovery/replica.cpp new file mode 100644 index 0000000000..74fdfc1910 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/replica.cpp @@ -0,0 +1,182 @@ +#include "services.h" +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/interconnect.h> +#include <util/generic/set.h> +#include <util/generic/hash_set.h> +#include <util/generic/vector.h> + +class TExampleReplicaActor : public TActor<TExampleReplicaActor> { + using TOwnerIndex = TMap<TActorId, ui32, TActorId::TOrderedCmp>; + using TKeyIndex = THashMap<TString, TSet<ui32>>; + + struct TEntry { + TString Payload; + TActorId Owner; + TOwnerIndex::iterator OwnerIt; + TKeyIndex::iterator KeyIt; + }; + + TVector<TEntry> Entries; + TVector<ui32> AvailableEntries; + + TOwnerIndex IndexOwner; + TKeyIndex IndexKey; + + ui32 AllocateEntry() { + ui32 ret; + if (AvailableEntries) { + ret = AvailableEntries.back(); + AvailableEntries.pop_back(); + } + else { + ret = Entries.size(); + Entries.emplace_back(); + } + + return ret; + } + + bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) { + const ui32 ownerNodeId = ownerIt->first.NodeId(); + if (ownerIt != IndexOwner.begin()) { + auto x = ownerIt; + --x; + if (x->first.NodeId() == ownerNodeId) + return false; + } + + ++ownerIt; + if (ownerIt != IndexOwner.end()) { + if (ownerIt->first.NodeId() == ownerNodeId) + return false; + } + + return true; + } + + void CleanupEntry(ui32 entryIndex) { + TEntry &entry = Entries[entryIndex]; + entry.KeyIt->second.erase(entryIndex); + if (entry.KeyIt->second.empty()) + IndexKey.erase(entry.KeyIt); + + if (IsLastEntryOnNode(entry.OwnerIt)) + Send(TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe()); + + IndexOwner.erase(entry.OwnerIt); + + TString().swap(entry.Payload); + entry.Owner = TActorId(); + entry.KeyIt = IndexKey.end(); + entry.OwnerIt = IndexOwner.end(); + + AvailableEntries.emplace_back(entryIndex); + } + + void Handle(TEvExample::TEvReplicaLookup::TPtr &ev) { + auto &record = ev->Get()->Record; + const auto &key = record.GetKey(); + + auto keyIt = IndexKey.find(key); + if (keyIt == IndexKey.end()) { + Send(ev->Sender, new TEvExample::TEvReplicaInfo(key), 0, ev->Cookie); + return; + } + + auto reply = MakeHolder<TEvExample::TEvReplicaInfo>(key); + reply->Record.MutablePayload()->Reserve(keyIt->second.size()); + for (ui32 entryIndex : keyIt->second) { + const TEntry &entry = Entries[entryIndex]; + reply->Record.AddPayload(entry.Payload); + } + + Send(ev->Sender, std::move(reply), 0, ev->Cookie); + } + + void Handle(TEvExample::TEvReplicaPublish::TPtr &ev) { + auto &record = ev->Get()->Record; + const TString &key = record.GetKey(); + const TString &payload = record.GetPayload(); + const TActorId &owner = ev->Sender; + + auto ownerIt = IndexOwner.find(owner); + if (ownerIt != IndexOwner.end()) { + const ui32 entryIndex = ownerIt->second; + TEntry &entry = Entries[entryIndex]; + if (entry.KeyIt->first != key) { + // reply nothing, request suspicious + return; + } + + entry.Payload = payload; + } + else { + const ui32 entryIndex = AllocateEntry(); + TEntry &entry = Entries[entryIndex]; + + entry.Payload = payload; + entry.Owner = owner; + + entry.OwnerIt = IndexOwner.emplace(owner, entryIndex).first; + entry.KeyIt = IndexKey.emplace(std::make_pair(key, TSet<ui32>())).first; + entry.KeyIt->second.emplace(entryIndex); + + Send(owner, new TEvExample::TEvReplicaPublishAck(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr &ev) { + auto ownerIt = IndexOwner.find(ev->Sender); + if (ownerIt == IndexOwner.end()) + return; + + CleanupEntry(ownerIt->second); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { + auto *msg = ev->Get(); + const ui32 nodeId = msg->NodeId; + auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0)); + while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) { + const ui32 idx = ownerIt->second; + ++ownerIt; + CleanupEntry(idx); + } + } + +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleReplicaActor() + : TActor(&TThis::StateWork) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaLookup, Handle); + hFunc(TEvExample::TEvReplicaPublish, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + default: + // here is place to spam some log message on unknown events + break; + } + } +}; + +IActor* CreateReplica() { + return new TExampleReplicaActor(); +} + +TActorId MakeReplicaId(ui32 nodeid) { + char x[12] = { 'r', 'p', 'l' }; + memcpy(x + 5, &nodeid, sizeof(ui32)); + return TActorId(nodeid, TStringBuf(x, 12)); +} diff --git a/library/cpp/actors/examples/02_discovery/services.h b/library/cpp/actors/examples/02_discovery/services.h new file mode 100644 index 0000000000..266517c577 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/services.h @@ -0,0 +1,85 @@ +#pragma once +#include <library/cpp/actors/examples/02_discovery/protocol.pb.h> + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/event_pb.h> +#include <library/cpp/actors/core/event_local.h> + +#include <util/generic/vector.h> + +using namespace NActors; + +struct TExampleStorageConfig : public TThrRefBase { + TVector<TActorId> Replicas; +}; + +struct TEvExample { + enum EEv { + EvReplicaLookup = EventSpaceBegin(TEvents::ES_USERSPACE + 1), + EvReplicaPublish, + + EvReplicaInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 2), + EvReplicaPublishAck, + + EvInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 3), + }; + + struct TEvReplicaLookup : public TEventPB<TEvReplicaLookup, NActorsExample::TEvReplicaLookup, EvReplicaLookup> { + TEvReplicaLookup() + {} + + TEvReplicaLookup(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvReplicaPublish : public TEventPB<TEvReplicaPublish, NActorsExample::TEvReplicaPublish, EvReplicaPublish> { + TEvReplicaPublish() + {} + + TEvReplicaPublish(const TString &key, const TString &payload) + { + Record.SetKey(key); + Record.SetPayload(payload); + } + }; + + struct TEvReplicaInfo : public TEventPB<TEvReplicaInfo, NActorsExample::TEvReplicaInfo, EvReplicaInfo> { + TEvReplicaInfo() + {} + + TEvReplicaInfo(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvReplicaPublishAck : public TEventPB<TEvReplicaPublishAck, NActorsExample::TEvReplicaPublishAck, EvReplicaPublishAck> { + TEvReplicaPublishAck() + {} + + TEvReplicaPublishAck(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvInfo : public TEventLocal<TEvInfo, EvInfo> { + const TString Key; + const TVector<TString> Payloads; + + TEvInfo(const TString &key, TVector<TString> &&payloads) + : Key(key) + , Payloads(payloads) + {} + }; +}; + +IActor* CreateReplica(); +IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what); +IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo); +IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 httpPort); + +TActorId MakeReplicaId(ui32 nodeid); |