From 056bb284ccf8dd6793ec3a54ffa36c4fb2b9ad11 Mon Sep 17 00:00:00 2001 From: alexv-smirnov Date: Wed, 15 Mar 2023 19:59:12 +0300 Subject: add library/cpp/actors, ymake build to ydb oss export --- .../01_ping_pong/CMakeLists.darwin-x86_64.txt | 29 ++++ .../01_ping_pong/CMakeLists.linux-aarch64.txt | 31 ++++ .../01_ping_pong/CMakeLists.linux-x86_64.txt | 32 ++++ .../actors/examples/01_ping_pong/CMakeLists.txt | 17 ++ .../01_ping_pong/CMakeLists.windows-x86_64.txt | 21 +++ library/cpp/actors/examples/01_ping_pong/main.cpp | 131 +++++++++++++++ library/cpp/actors/examples/01_ping_pong/ya.make | 13 ++ .../02_discovery/CMakeLists.darwin-x86_64.txt | 65 ++++++++ .../02_discovery/CMakeLists.linux-aarch64.txt | 67 ++++++++ .../02_discovery/CMakeLists.linux-x86_64.txt | 68 ++++++++ .../actors/examples/02_discovery/CMakeLists.txt | 17 ++ .../02_discovery/CMakeLists.windows-x86_64.txt | 57 +++++++ .../cpp/actors/examples/02_discovery/endpoint.cpp | 118 +++++++++++++ .../cpp/actors/examples/02_discovery/lookup.cpp | 134 +++++++++++++++ library/cpp/actors/examples/02_discovery/main.cpp | 136 +++++++++++++++ .../actors/examples/02_discovery/protocol.proto | 19 +++ .../cpp/actors/examples/02_discovery/publish.cpp | 113 +++++++++++++ .../cpp/actors/examples/02_discovery/replica.cpp | 182 +++++++++++++++++++++ .../cpp/actors/examples/02_discovery/services.h | 85 ++++++++++ library/cpp/actors/examples/02_discovery/ya.make | 25 +++ library/cpp/actors/examples/CMakeLists.txt | 10 ++ library/cpp/actors/examples/ya.make | 4 + 22 files changed, 1374 insertions(+) create mode 100644 library/cpp/actors/examples/01_ping_pong/CMakeLists.darwin-x86_64.txt create mode 100644 library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-aarch64.txt create mode 100644 library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-x86_64.txt create mode 100644 library/cpp/actors/examples/01_ping_pong/CMakeLists.txt create mode 100644 library/cpp/actors/examples/01_ping_pong/CMakeLists.windows-x86_64.txt create mode 100644 library/cpp/actors/examples/01_ping_pong/main.cpp create mode 100644 library/cpp/actors/examples/01_ping_pong/ya.make create mode 100644 library/cpp/actors/examples/02_discovery/CMakeLists.darwin-x86_64.txt create mode 100644 library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt create mode 100644 library/cpp/actors/examples/02_discovery/CMakeLists.linux-x86_64.txt create mode 100644 library/cpp/actors/examples/02_discovery/CMakeLists.txt create mode 100644 library/cpp/actors/examples/02_discovery/CMakeLists.windows-x86_64.txt create mode 100644 library/cpp/actors/examples/02_discovery/endpoint.cpp create mode 100644 library/cpp/actors/examples/02_discovery/lookup.cpp create mode 100644 library/cpp/actors/examples/02_discovery/main.cpp create mode 100644 library/cpp/actors/examples/02_discovery/protocol.proto create mode 100644 library/cpp/actors/examples/02_discovery/publish.cpp create mode 100644 library/cpp/actors/examples/02_discovery/replica.cpp create mode 100644 library/cpp/actors/examples/02_discovery/services.h create mode 100644 library/cpp/actors/examples/02_discovery/ya.make create mode 100644 library/cpp/actors/examples/CMakeLists.txt create mode 100644 library/cpp/actors/examples/ya.make (limited to 'library/cpp/actors/examples') diff --git a/library/cpp/actors/examples/01_ping_pong/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/examples/01_ping_pong/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..1d1c02036d6 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,29 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_01_ping_pong) +target_link_libraries(example_01_ping_pong PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core +) +target_link_options(example_01_ping_pong PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(example_01_ping_pong PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/01_ping_pong/main.cpp +) +vcs_info(example_01_ping_pong) diff --git a/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-aarch64.txt b/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..d37268db3b7 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-aarch64.txt @@ -0,0 +1,31 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_01_ping_pong) +target_link_libraries(example_01_ping_pong PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-actors-core +) +target_link_options(example_01_ping_pong PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(example_01_ping_pong PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/01_ping_pong/main.cpp +) +vcs_info(example_01_ping_pong) diff --git a/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-x86_64.txt b/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..79067f5ead7 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/CMakeLists.linux-x86_64.txt @@ -0,0 +1,32 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_01_ping_pong) +target_link_libraries(example_01_ping_pong PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core +) +target_link_options(example_01_ping_pong PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(example_01_ping_pong PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/01_ping_pong/main.cpp +) +vcs_info(example_01_ping_pong) diff --git a/library/cpp/actors/examples/01_ping_pong/CMakeLists.txt b/library/cpp/actors/examples/01_ping_pong/CMakeLists.txt new file mode 100644 index 00000000000..d90657116d0 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/actors/examples/01_ping_pong/CMakeLists.windows-x86_64.txt b/library/cpp/actors/examples/01_ping_pong/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..af8b315ee34 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/CMakeLists.windows-x86_64.txt @@ -0,0 +1,21 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(example_01_ping_pong) +target_link_libraries(example_01_ping_pong PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core +) +target_sources(example_01_ping_pong PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/01_ping_pong/main.cpp +) +vcs_info(example_01_ping_pong) diff --git a/library/cpp/actors/examples/01_ping_pong/main.cpp b/library/cpp/actors/examples/01_ping_pong/main.cpp new file mode 100644 index 00000000000..ccb73f7d458 --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/main.cpp @@ -0,0 +1,131 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace NActors; + +static TProgramShouldContinue ShouldContinue; + +void OnTerminate(int) { + ShouldContinue.ShouldStop(); +} + +class TPingActor : public TActorBootstrapped { + const TActorId Target; + ui64 HandledEvents; + TInstant PeriodStart; + + void Handle(TEvents::TEvPing::TPtr &ev) { + Send(ev->Sender, new TEvents::TEvPong()); + Send(ev->Sender, new TEvents::TEvPing()); + Become(&TThis::StatePing); + } + + void Handle(TEvents::TEvPong::TPtr &ev) { + Y_UNUSED(ev); + Become(&TThis::StateWait); + } + + void PrintStats() { + const i64 ms = (TInstant::Now() - PeriodStart).MilliSeconds(); + Cout << "Handled " << 2 * HandledEvents << " over " << ms << "ms" << Endl; + ScheduleStats(); + } + + void ScheduleStats() { + HandledEvents = 0; + PeriodStart = TInstant::Now(); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup()); + } + +public: + TPingActor(TActorId target) + : Target(target) + , HandledEvents(0) + , PeriodStart(TInstant::Now()) + {} + + STFUNC(StateWait) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvPing, Handle); + cFunc(TEvents::TEvWakeup::EventType, PrintStats); + } + + ++HandledEvents; + } + + STFUNC(StatePing) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvPong, Handle); + cFunc(TEvents::TEvWakeup::EventType, PrintStats); + } + + ++HandledEvents; + } + + void Bootstrap() { + if (Target) { + Become(&TThis::StatePing); + Send(Target, new TEvents::TEvPing()); + ScheduleStats(); + } + else { + Become(&TThis::StateWait); + }; + } +}; + +THolder BuildActorSystemSetup(ui32 threads, ui32 pools) { + Y_VERIFY(threads > 0 && threads < 100); + Y_VERIFY(pools > 0 && pools < 10); + + auto setup = MakeHolder(); + + setup->NodeId = 1; + + setup->ExecutorsCount = pools; + setup->Executors.Reset(new TAutoPtr[pools]); + for (ui32 idx : xrange(pools)) { + setup->Executors[idx] = new TBasicExecutorPool(idx, threads, 50); + } + + setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); + + return setup; +} + +int main(int argc, char **argv) { + Y_UNUSED(argc); + Y_UNUSED(argv); + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + THolder actorSystemSetup = BuildActorSystemSetup(2, 1); + TActorSystem actorSystem(actorSystemSetup); + + actorSystem.Start(); + + const TActorId a = actorSystem.Register(new TPingActor(TActorId())); + const TActorId b = actorSystem.Register(new TPingActor(a)); + Y_UNUSED(b); + + while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { + Sleep(TDuration::MilliSeconds(200)); + } + + actorSystem.Stop(); + actorSystem.Cleanup(); + + return ShouldContinue.GetReturnCode(); +} diff --git a/library/cpp/actors/examples/01_ping_pong/ya.make b/library/cpp/actors/examples/01_ping_pong/ya.make new file mode 100644 index 00000000000..d33cfd3456c --- /dev/null +++ b/library/cpp/actors/examples/01_ping_pong/ya.make @@ -0,0 +1,13 @@ +PROGRAM(example_01_ping_pong) + +ALLOCATOR(LF) + +SRCS( + main.cpp +) + +PEERDIR( + library/cpp/actors/core +) + +END() diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.darwin-x86_64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin-x86_64.txt new file mode 100644 index 00000000000..2239d2b5495 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,65 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt new file mode 100644 index 00000000000..9f53b255761 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-aarch64.txt @@ -0,0 +1,67 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.linux-x86_64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-x86_64.txt new file mode 100644 index 00000000000..934f3d6b086 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.linux-x86_64.txt @@ -0,0 +1,68 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_link_options(example_02_discovery PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.txt new file mode 100644 index 00000000000..d90657116d0 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND UNIX AND NOT APPLE AND NOT ANDROID) + include(CMakeLists.linux-aarch64.txt) +elseif (APPLE AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND UNIX AND NOT APPLE AND NOT ANDROID AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/library/cpp/actors/examples/02_discovery/CMakeLists.windows-x86_64.txt b/library/cpp/actors/examples/02_discovery/CMakeLists.windows-x86_64.txt new file mode 100644 index 00000000000..0e4c17401b4 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/CMakeLists.windows-x86_64.txt @@ -0,0 +1,57 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +get_built_tool_path( + TOOL_protoc_bin + TOOL_protoc_dependency + contrib/tools/protoc/bin + protoc +) +get_built_tool_path( + TOOL_cpp_styleguide_bin + TOOL_cpp_styleguide_dependency + contrib/tools/protoc/plugins/cpp_styleguide + cpp_styleguide +) + +add_executable(example_02_discovery) +target_link_libraries(example_02_discovery PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-actors-core + cpp-actors-dnsresolver + cpp-actors-interconnect + cpp-actors-http + contrib-libs-protobuf +) +target_proto_messages(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/protocol.proto +) +target_sources(example_02_discovery PRIVATE + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/endpoint.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/lookup.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/main.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/publish.cpp + ${CMAKE_SOURCE_DIR}/library/cpp/actors/examples/02_discovery/replica.cpp +) +target_proto_addincls(example_02_discovery + ./ + ${CMAKE_SOURCE_DIR}/ + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/contrib/libs/protobuf/src +) +target_proto_outs(example_02_discovery + --cpp_out=${CMAKE_BINARY_DIR}/ + --cpp_styleguide_out=${CMAKE_BINARY_DIR}/ +) +vcs_info(example_02_discovery) diff --git a/library/cpp/actors/examples/02_discovery/endpoint.cpp b/library/cpp/actors/examples/02_discovery/endpoint.cpp new file mode 100644 index 00000000000..97780e8b4c7 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/endpoint.cpp @@ -0,0 +1,118 @@ +#include "services.h" + +#include +#include +#include + +#include +#include + +#include +#include + +class TExampleHttpRequest : public TActor { + TIntrusivePtr Config; + const TString PublishKey; + + TActorId HttpProxy; + NHttp::THttpIncomingRequestPtr Request; + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) { + Request = std::move(ev->Get()->Request); + HttpProxy = ev->Sender; + + Register(CreateLookupActor(Config.Get(), PublishKey, SelfId())); + } + + void Handle(TEvExample::TEvInfo::TPtr &ev) { + auto *msg = ev->Get(); + + TStringBuilder body; + for (const auto &x : msg->Payloads) + body << x << Endl; + + auto response = Request->CreateResponseOK(body, "application/text; charset=utf-8"); + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvHttpOutgoingResponse(response)); + + PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleHttpRequest(TExampleStorageConfig *config, const TString &publishKey) + : TActor(&TThis::StateWork) + , Config(config) + , PublishKey(publishKey) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + hFunc(TEvExample::TEvInfo, Handle); + } + } +}; + +class TExampleHttpEndpoint : public TActorBootstrapped { + TIntrusivePtr Config; + const TString PublishKey; + const ui16 HttpPort; + + TActorId PublishActor; + TActorId HttpProxy; + + std::shared_ptr SensorsRegistry = std::make_shared(); + + void PassAway() override { + Send(PublishActor, new TEvents::TEvPoison()); + Send(HttpProxy, new TEvents::TEvPoison()); + + return TActor::PassAway(); + } + + void Handle(NHttp::TEvHttpProxy::TEvHttpIncomingRequest::TPtr &ev) { + const TActorId reqActor = Register(new TExampleHttpRequest(Config.Get(), PublishKey)); + TlsActivationContext->Send(ev->Forward(reqActor)); + } + +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleHttpEndpoint(TExampleStorageConfig *config, const TString &publishKey, ui16 port) + : Config(config) + , PublishKey(publishKey) + , HttpPort(port) + { + } + + void Bootstrap() { + const TString publishPayload = ToString(HttpPort); + PublishActor = Register(CreatePublishActor(Config.Get(), PublishKey, publishPayload)); + HttpProxy = Register(NHttp::CreateHttpProxy(SensorsRegistry)); + + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvAddListeningPort(HttpPort, FQDNHostName())); + Send(HttpProxy, new NHttp::TEvHttpProxy::TEvRegisterHandler("/list", SelfId())); + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(NHttp::TEvHttpProxy::TEvHttpIncomingRequest, Handle); + default: + break; + } + } +}; + +IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 port) { + return new TExampleHttpEndpoint(config, publishKey, port); +} diff --git a/library/cpp/actors/examples/02_discovery/lookup.cpp b/library/cpp/actors/examples/02_discovery/lookup.cpp new file mode 100644 index 00000000000..469fba74fad --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/lookup.cpp @@ -0,0 +1,134 @@ +#include "services.h" + +#include +#include +#include +#include +#include +#include + +class TExampleLookupRequestActor : public TActor { + const TActorId Owner; + const TActorId Replica; + const TString Key; + + void Registered(TActorSystem* sys, const TActorId&) override { + const auto flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession; + sys->Send(new IEventHandleFat(Replica, SelfId(), new TEvExample::TEvReplicaLookup(Key), flags)); + } + + void PassAway() override { + const ui32 replicaNode = Replica.NodeId(); + if (replicaNode != SelfId().NodeId()) { + const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId()); + Send(interconnectProxy, new TEvents::TEvUnsubscribe()); + } + return IActor::PassAway(); + } + + void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) { + Send(Owner, ev->Release().Release()); + return PassAway(); + } + + void HandleUndelivered() { + Send(Owner, new TEvExample::TEvReplicaInfo(Key)); + return PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleLookupRequestActor(TActorId owner, TActorId replica, const TString &key) + : TActor(&TThis::StateWork) + , Owner(owner) + , Replica(replica) + , Key(key) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaInfo, Handle); + cFunc(TEvents::TEvUndelivered::EventType, HandleUndelivered); + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, HandleUndelivered); + default: + break; + } + } +}; + +class TExampleLookupActor : public TActorBootstrapped { + TIntrusiveConstPtr Config; + const TString Key; + const TActorId ReplyTo; + TVector RequestActors; + + ui32 TotalReplicas = 0; + ui32 RepliedSuccess = 0; + ui32 RepliedError = 0; + + TSet Payloads; + + void Handle(TEvExample::TEvReplicaInfo::TPtr &ev) { + NActorsExample::TEvReplicaInfo &record = ev->Get()->Record; + if (record.PayloadSize()) { + ++RepliedSuccess; + for (const TString &payload : record.GetPayload()) { + Payloads.insert(payload); + } + } + else { + ++RepliedError; + } + + const ui32 majority = (TotalReplicas / 2 + 1); + if (RepliedSuccess == majority || (RepliedSuccess + RepliedError == TotalReplicas)) + return ReplyAndDie(); + } + + void ReplyAndDie() { + TVector replyPayloads(Payloads.begin(), Payloads.end()); + Send(ReplyTo, new TEvExample::TEvInfo(Key, std::move(replyPayloads))); + return PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo) + : Config(config) + , Key(key) + , ReplyTo(replyTo) + {} + + void Bootstrap() { + Y_VERIFY(Config->Replicas.size() > 0); + + TotalReplicas = Config->Replicas.size(); + RequestActors.reserve(TotalReplicas); + for (const auto &replica : Config->Replicas) { + const TActorId requestActor = Register(new TExampleLookupRequestActor(SelfId(), replica, Key)); + RequestActors.emplace_back(requestActor); + } + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaInfo, Handle); + default: + break; + } + } +}; + +IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo) { + return new TExampleLookupActor(config, key, replyTo); +} diff --git a/library/cpp/actors/examples/02_discovery/main.cpp b/library/cpp/actors/examples/02_discovery/main.cpp new file mode 100644 index 00000000000..379fd6de84d --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/main.cpp @@ -0,0 +1,136 @@ +#include "services.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +using namespace NActors; +using namespace NActors::NDnsResolver; + +static const ui32 CfgTotalReplicaNodes = 5; +static const ui16 CfgBasePort = 13300; +static const ui16 CfgHttpPort = 8881; +static const TString PublishKey = "endpoint"; + +static TProgramShouldContinue ShouldContinue; + +void OnTerminate(int) { + ShouldContinue.ShouldStop(); +} + +THolder BuildActorSystemSetup(ui32 nodeId, ui32 threads, NMonitoring::TDynamicCounters &counters) { + Y_VERIFY(threads > 0 && threads < 100); + + auto setup = MakeHolder(); + + setup->NodeId = nodeId; + + setup->ExecutorsCount = 1; + setup->Executors.Reset(new TAutoPtr[1]); + setup->Executors[0] = new TBasicExecutorPool(0, threads, 50); + setup->Scheduler = new TBasicSchedulerThread(TSchedulerConfig(512, 0)); + + setup->LocalServices.emplace_back(MakePollerActorId(), TActorSetupCmd(CreatePollerActor(), TMailboxType::ReadAsFilled, 0)); + + TIntrusivePtr nameserverTable = new TTableNameserverSetup(); + for (ui32 xnode : xrange(1, CfgTotalReplicaNodes + 1)) { + nameserverTable->StaticNodeTable[xnode] = std::make_pair("127.0.0.1", CfgBasePort + xnode); + } + + setup->LocalServices.emplace_back( + MakeDnsResolverActorId(), + TActorSetupCmd(CreateOnDemandDnsResolver(), TMailboxType::ReadAsFilled, 0) + ); + + setup->LocalServices.emplace_back( + GetNameserviceActorId(), + TActorSetupCmd(CreateNameserverTable(nameserverTable), TMailboxType::ReadAsFilled, 0) + ); + + TIntrusivePtr icCommon = new TInterconnectProxyCommon(); + icCommon->NameserviceId = GetNameserviceActorId(); + icCommon->MonCounters = counters.GetSubgroup("counters", "interconnect"); + icCommon->TechnicalSelfHostName = "127.0.0.1"; + + setup->Interconnect.ProxyActors.resize(CfgTotalReplicaNodes + 1); + for (ui32 xnode : xrange(1, CfgTotalReplicaNodes + 1)) { + if (xnode != nodeId) { + IActor *actor = new TInterconnectProxyTCP(xnode, icCommon); + setup->Interconnect.ProxyActors[xnode] = TActorSetupCmd(actor, TMailboxType::ReadAsFilled, 0); + } + else { + IActor *listener = new TInterconnectListenerTCP("127.0.0.1", CfgBasePort + xnode, icCommon); + setup->LocalServices.emplace_back( + MakeInterconnectListenerActorId(false), + TActorSetupCmd(listener, TMailboxType::ReadAsFilled, 0) + ); + } + } + + return setup; +} + +int main(int argc, char **argv) { + Y_UNUSED(argc); + Y_UNUSED(argv); + +#ifdef _unix_ + signal(SIGPIPE, SIG_IGN); +#endif + signal(SIGINT, &OnTerminate); + signal(SIGTERM, &OnTerminate); + + TIntrusivePtr config = new TExampleStorageConfig(); + for (ui32 nodeid : xrange(1, CfgTotalReplicaNodes + 1)) { + config->Replicas.push_back(MakeReplicaId(nodeid)); + } + + TVector> actorSystemHolder; + TVector> countersHolder; + for (ui32 nodeid : xrange(1, CfgTotalReplicaNodes + 1)) { + countersHolder.emplace_back(new NMonitoring::TDynamicCounters()); + THolder actorSystemSetup = BuildActorSystemSetup(nodeid, 2, *countersHolder.back()); + actorSystemSetup->LocalServices.emplace_back( + TActorId(), + TActorSetupCmd(CreateEndpointActor(config.Get(), PublishKey, CfgHttpPort + nodeid), TMailboxType::HTSwap, 0) + ); + + actorSystemSetup->LocalServices.emplace_back( + MakeReplicaId(nodeid), + TActorSetupCmd(CreateReplica(), TMailboxType::ReadAsFilled, 0) + ); + + actorSystemHolder.emplace_back(new TActorSystem(actorSystemSetup)); + } + + for (auto &xh : actorSystemHolder) + xh->Start(); + + while (ShouldContinue.PollState() == TProgramShouldContinue::Continue) { + Sleep(TDuration::MilliSeconds(200)); + } + + // stop actorsystem to not generate new reqeusts for external services + // no events would be processed anymore + for (auto &xh : actorSystemHolder) + xh->Stop(); + + // and then cleanup actorsystem + // from this moment working with actorsystem prohibited + for (auto &xh : actorSystemHolder) + xh->Cleanup(); + + return ShouldContinue.GetReturnCode(); +} diff --git a/library/cpp/actors/examples/02_discovery/protocol.proto b/library/cpp/actors/examples/02_discovery/protocol.proto new file mode 100644 index 00000000000..41cc2cc9c82 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/protocol.proto @@ -0,0 +1,19 @@ +package NActorsExample; + +message TEvReplicaInfo { + optional string Key = 1; + repeated string Payload = 2; +}; + +message TEvReplicaPublishAck { + optional string Key = 1; +}; + +message TEvReplicaPublish { + optional string Key = 1; + optional string Payload = 2; +}; + +message TEvReplicaLookup { + optional string Key = 1; +}; diff --git a/library/cpp/actors/examples/02_discovery/publish.cpp b/library/cpp/actors/examples/02_discovery/publish.cpp new file mode 100644 index 00000000000..8dc5fbcea47 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/publish.cpp @@ -0,0 +1,113 @@ +#include "services.h" + +#include +#include +#include +#include +#include +#include + +class TExamplePublishReplicaActor : public TActorBootstrapped { + const TActorId Owner; + const TActorId Replica; + const TString Key; + const TString Payload; + + void PassAway() override { + const ui32 replicaNode = Replica.NodeId(); + if (replicaNode != SelfId().NodeId()) { + const TActorId &interconnectProxy = TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(Replica.NodeId()); + Send(interconnectProxy, new TEvents::TEvUnsubscribe()); + } + return IActor::PassAway(); + } + + void SomeSleep() { + Become(&TThis::StateSleep, TDuration::MilliSeconds(250), new TEvents::TEvWakeup()); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExamplePublishReplicaActor(TActorId owner, TActorId replica, const TString &key, const TString &payload) + : Owner(owner) + , Replica(replica) + , Key(key) + , Payload(payload) + {} + + void Bootstrap() { + const ui32 flags = IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession; + Send(Replica, new TEvExample::TEvReplicaPublish(Key, Payload), flags); + Become(&TThis::StatePublish); + } + + STFUNC(StatePublish) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + cFunc(TEvents::TEvUndelivered::EventType, SomeSleep); + cFunc(TEvInterconnect::TEvNodeDisconnected::EventType, SomeSleep); + default: + break; + } + } + + STFUNC(StateSleep) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + cFunc(TEvents::TEvWakeup::EventType, Bootstrap); + default: + break; + } + } +}; + +class TExamplePublishActor : public TActorBootstrapped { + TIntrusiveConstPtr Config; + const TString Key; + const TString Payload; + TVector PublishActors; + + void PassAway() override { + for (const auto &x : PublishActors) + Send(x, new TEvents::TEvPoison()); + return IActor::PassAway(); + } +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExamplePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) + : Config(config) + , Key(key) + , Payload(what) + {} + + void Bootstrap() { + for (auto &replica : Config->Replicas) { + const TActorId x = Register(new TExamplePublishReplicaActor(SelfId(), replica, Key, Payload)); + PublishActors.emplace_back(x); + } + + Become(&TThis::StateWork); + } + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + cFunc(TEvents::TEvPoison::EventType, PassAway); + default: + break; + } + } +}; + +IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what) { + return new TExamplePublishActor(config, key, what); +} diff --git a/library/cpp/actors/examples/02_discovery/replica.cpp b/library/cpp/actors/examples/02_discovery/replica.cpp new file mode 100644 index 00000000000..74fdfc1910d --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/replica.cpp @@ -0,0 +1,182 @@ +#include "services.h" +#include +#include +#include +#include +#include +#include + +class TExampleReplicaActor : public TActor { + using TOwnerIndex = TMap; + using TKeyIndex = THashMap>; + + struct TEntry { + TString Payload; + TActorId Owner; + TOwnerIndex::iterator OwnerIt; + TKeyIndex::iterator KeyIt; + }; + + TVector Entries; + TVector AvailableEntries; + + TOwnerIndex IndexOwner; + TKeyIndex IndexKey; + + ui32 AllocateEntry() { + ui32 ret; + if (AvailableEntries) { + ret = AvailableEntries.back(); + AvailableEntries.pop_back(); + } + else { + ret = Entries.size(); + Entries.emplace_back(); + } + + return ret; + } + + bool IsLastEntryOnNode(TOwnerIndex::iterator ownerIt) { + const ui32 ownerNodeId = ownerIt->first.NodeId(); + if (ownerIt != IndexOwner.begin()) { + auto x = ownerIt; + --x; + if (x->first.NodeId() == ownerNodeId) + return false; + } + + ++ownerIt; + if (ownerIt != IndexOwner.end()) { + if (ownerIt->first.NodeId() == ownerNodeId) + return false; + } + + return true; + } + + void CleanupEntry(ui32 entryIndex) { + TEntry &entry = Entries[entryIndex]; + entry.KeyIt->second.erase(entryIndex); + if (entry.KeyIt->second.empty()) + IndexKey.erase(entry.KeyIt); + + if (IsLastEntryOnNode(entry.OwnerIt)) + Send(TlsActivationContext->ExecutorThread.ActorSystem->InterconnectProxy(entry.OwnerIt->first.NodeId()), new TEvents::TEvUnsubscribe()); + + IndexOwner.erase(entry.OwnerIt); + + TString().swap(entry.Payload); + entry.Owner = TActorId(); + entry.KeyIt = IndexKey.end(); + entry.OwnerIt = IndexOwner.end(); + + AvailableEntries.emplace_back(entryIndex); + } + + void Handle(TEvExample::TEvReplicaLookup::TPtr &ev) { + auto &record = ev->Get()->Record; + const auto &key = record.GetKey(); + + auto keyIt = IndexKey.find(key); + if (keyIt == IndexKey.end()) { + Send(ev->Sender, new TEvExample::TEvReplicaInfo(key), 0, ev->Cookie); + return; + } + + auto reply = MakeHolder(key); + reply->Record.MutablePayload()->Reserve(keyIt->second.size()); + for (ui32 entryIndex : keyIt->second) { + const TEntry &entry = Entries[entryIndex]; + reply->Record.AddPayload(entry.Payload); + } + + Send(ev->Sender, std::move(reply), 0, ev->Cookie); + } + + void Handle(TEvExample::TEvReplicaPublish::TPtr &ev) { + auto &record = ev->Get()->Record; + const TString &key = record.GetKey(); + const TString &payload = record.GetPayload(); + const TActorId &owner = ev->Sender; + + auto ownerIt = IndexOwner.find(owner); + if (ownerIt != IndexOwner.end()) { + const ui32 entryIndex = ownerIt->second; + TEntry &entry = Entries[entryIndex]; + if (entry.KeyIt->first != key) { + // reply nothing, request suspicious + return; + } + + entry.Payload = payload; + } + else { + const ui32 entryIndex = AllocateEntry(); + TEntry &entry = Entries[entryIndex]; + + entry.Payload = payload; + entry.Owner = owner; + + entry.OwnerIt = IndexOwner.emplace(owner, entryIndex).first; + entry.KeyIt = IndexKey.emplace(std::make_pair(key, TSet())).first; + entry.KeyIt->second.emplace(entryIndex); + + Send(owner, new TEvExample::TEvReplicaPublishAck(), IEventHandle::FlagTrackDelivery | IEventHandle::FlagSubscribeOnSession, ev->Cookie); + } + } + + void Handle(TEvents::TEvUndelivered::TPtr &ev) { + auto ownerIt = IndexOwner.find(ev->Sender); + if (ownerIt == IndexOwner.end()) + return; + + CleanupEntry(ownerIt->second); + } + + void Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev) { + auto *msg = ev->Get(); + const ui32 nodeId = msg->NodeId; + auto ownerIt = IndexOwner.lower_bound(TActorId(nodeId, 0, 0, 0)); + while (ownerIt != IndexOwner.end() && ownerIt->first.NodeId() == nodeId) { + const ui32 idx = ownerIt->second; + ++ownerIt; + CleanupEntry(idx); + } + } + +public: + static constexpr IActor::EActivityType ActorActivityType() { + // define app-specific activity tag to track elapsed cpu | handled events | actor count in Solomon + return EActorActivity::ACTORLIB_COMMON; + } + + TExampleReplicaActor() + : TActor(&TThis::StateWork) + {} + + STFUNC(StateWork) { + Y_UNUSED(ctx); + switch (ev->GetTypeRewrite()) { + hFunc(TEvExample::TEvReplicaLookup, Handle); + hFunc(TEvExample::TEvReplicaPublish, Handle); + hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvInterconnect::TEvNodeDisconnected, Handle); + + IgnoreFunc(TEvInterconnect::TEvNodeConnected); + default: + // here is place to spam some log message on unknown events + break; + } + } +}; + +IActor* CreateReplica() { + return new TExampleReplicaActor(); +} + +TActorId MakeReplicaId(ui32 nodeid) { + char x[12] = { 'r', 'p', 'l' }; + memcpy(x + 5, &nodeid, sizeof(ui32)); + return TActorId(nodeid, TStringBuf(x, 12)); +} diff --git a/library/cpp/actors/examples/02_discovery/services.h b/library/cpp/actors/examples/02_discovery/services.h new file mode 100644 index 00000000000..266517c5772 --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/services.h @@ -0,0 +1,85 @@ +#pragma once +#include + +#include +#include +#include +#include + +#include + +using namespace NActors; + +struct TExampleStorageConfig : public TThrRefBase { + TVector Replicas; +}; + +struct TEvExample { + enum EEv { + EvReplicaLookup = EventSpaceBegin(TEvents::ES_USERSPACE + 1), + EvReplicaPublish, + + EvReplicaInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 2), + EvReplicaPublishAck, + + EvInfo = EventSpaceBegin(TEvents::ES_USERSPACE + 3), + }; + + struct TEvReplicaLookup : public TEventPB { + TEvReplicaLookup() + {} + + TEvReplicaLookup(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvReplicaPublish : public TEventPB { + TEvReplicaPublish() + {} + + TEvReplicaPublish(const TString &key, const TString &payload) + { + Record.SetKey(key); + Record.SetPayload(payload); + } + }; + + struct TEvReplicaInfo : public TEventPB { + TEvReplicaInfo() + {} + + TEvReplicaInfo(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvReplicaPublishAck : public TEventPB { + TEvReplicaPublishAck() + {} + + TEvReplicaPublishAck(const TString &key) + { + Record.SetKey(key); + } + }; + + struct TEvInfo : public TEventLocal { + const TString Key; + const TVector Payloads; + + TEvInfo(const TString &key, TVector &&payloads) + : Key(key) + , Payloads(payloads) + {} + }; +}; + +IActor* CreateReplica(); +IActor* CreatePublishActor(TExampleStorageConfig *config, const TString &key, const TString &what); +IActor* CreateLookupActor(TExampleStorageConfig *config, const TString &key, TActorId replyTo); +IActor* CreateEndpointActor(TExampleStorageConfig *config, const TString &publishKey, ui16 httpPort); + +TActorId MakeReplicaId(ui32 nodeid); diff --git a/library/cpp/actors/examples/02_discovery/ya.make b/library/cpp/actors/examples/02_discovery/ya.make new file mode 100644 index 00000000000..953c13259cb --- /dev/null +++ b/library/cpp/actors/examples/02_discovery/ya.make @@ -0,0 +1,25 @@ +PROGRAM(example_02_discovery) + +ALLOCATOR(LF) + +SRCS( + endpoint.cpp + lookup.cpp + main.cpp + publish.cpp + replica.cpp + services.h +) + +SRCS( + protocol.proto +) + +PEERDIR( + library/cpp/actors/core + library/cpp/actors/dnsresolver + library/cpp/actors/interconnect + library/cpp/actors/http +) + +END() diff --git a/library/cpp/actors/examples/CMakeLists.txt b/library/cpp/actors/examples/CMakeLists.txt new file mode 100644 index 00000000000..bcda1cfeefc --- /dev/null +++ b/library/cpp/actors/examples/CMakeLists.txt @@ -0,0 +1,10 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +add_subdirectory(01_ping_pong) +add_subdirectory(02_discovery) diff --git a/library/cpp/actors/examples/ya.make b/library/cpp/actors/examples/ya.make new file mode 100644 index 00000000000..0a98074b478 --- /dev/null +++ b/library/cpp/actors/examples/ya.make @@ -0,0 +1,4 @@ +RECURSE( + 01_ping_pong + 02_discovery +) -- cgit v1.3