aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-11-29 13:50:45 +0300
committerdcherednik <dcherednik@ydb.tech>2023-11-29 15:47:31 +0300
commit10fd58d05678db9a22303a46178f5ed6c7150601 (patch)
tree3a9837bd2156df5d6e54d182679e8f0f44f7bc7d /library
parente92c0cb46ca4a92ac06cef509ab210296d9f0b99 (diff)
downloadydb-10fd58d05678db9a22303a46178f5ed6c7150601.tar.gz
Use own copy of library/grpc
Diffstat (limited to 'library')
-rw-r--r--library/cpp/CMakeLists.darwin-arm64.txt1
-rw-r--r--library/cpp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--library/cpp/CMakeLists.linux-aarch64.txt1
-rw-r--r--library/cpp/CMakeLists.linux-x86_64.txt1
-rw-r--r--library/cpp/CMakeLists.windows-x86_64.txt1
-rw-r--r--library/cpp/grpc/CMakeLists.txt10
-rw-r--r--library/cpp/grpc/client/CMakeLists.darwin-arm64.txt19
-rw-r--r--library/cpp/grpc/client/CMakeLists.darwin-x86_64.txt19
-rw-r--r--library/cpp/grpc/client/CMakeLists.linux-aarch64.txt20
-rw-r--r--library/cpp/grpc/client/CMakeLists.linux-x86_64.txt20
-rw-r--r--library/cpp/grpc/client/CMakeLists.txt19
-rw-r--r--library/cpp/grpc/client/CMakeLists.windows-x86_64.txt19
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp591
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h1426
-rw-r--r--library/cpp/grpc/client/grpc_common.h84
-rw-r--r--library/cpp/grpc/client/ut/grpc_client_low_ut.cpp61
-rw-r--r--library/cpp/grpc/client/ut/ya.make7
-rw-r--r--library/cpp/grpc/client/ya.make16
-rw-r--r--library/cpp/grpc/server/CMakeLists.darwin-arm64.txt35
-rw-r--r--library/cpp/grpc/server/CMakeLists.darwin-x86_64.txt35
-rw-r--r--library/cpp/grpc/server/CMakeLists.linux-aarch64.txt36
-rw-r--r--library/cpp/grpc/server/CMakeLists.linux-x86_64.txt36
-rw-r--r--library/cpp/grpc/server/CMakeLists.txt19
-rw-r--r--library/cpp/grpc/server/CMakeLists.windows-x86_64.txt35
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.darwin-arm64.txt18
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.darwin-x86_64.txt18
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.linux-aarch64.txt19
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.linux-x86_64.txt19
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.txt19
-rw-r--r--library/cpp/grpc/server/actors/CMakeLists.windows-x86_64.txt18
-rw-r--r--library/cpp/grpc/server/actors/logger.cpp45
-rw-r--r--library/cpp/grpc/server/actors/logger.h11
-rw-r--r--library/cpp/grpc/server/actors/ya.make11
-rw-r--r--library/cpp/grpc/server/event_callback.cpp1
-rw-r--r--library/cpp/grpc/server/event_callback.h80
-rw-r--r--library/cpp/grpc/server/grpc_async_ctx_base.h109
-rw-r--r--library/cpp/grpc/server/grpc_counters.cpp45
-rw-r--r--library/cpp/grpc/server/grpc_counters.h136
-rw-r--r--library/cpp/grpc/server/grpc_request.cpp59
-rw-r--r--library/cpp/grpc/server/grpc_request.h603
-rw-r--r--library/cpp/grpc/server/grpc_request_base.h124
-rw-r--r--library/cpp/grpc/server/grpc_response.h90
-rw-r--r--library/cpp/grpc/server/grpc_server.cpp251
-rw-r--r--library/cpp/grpc/server/grpc_server.h404
-rw-r--r--library/cpp/grpc/server/logger.h43
-rw-r--r--library/cpp/grpc/server/ut/grpc_response_ut.cpp86
-rw-r--r--library/cpp/grpc/server/ut/stream_adaptor_ut.cpp121
-rw-r--r--library/cpp/grpc/server/ut/ya.make16
-rw-r--r--library/cpp/grpc/server/ya.make20
49 files changed, 0 insertions, 4878 deletions
diff --git a/library/cpp/CMakeLists.darwin-arm64.txt b/library/cpp/CMakeLists.darwin-arm64.txt
index b7794e3005..58a86cdbb5 100644
--- a/library/cpp/CMakeLists.darwin-arm64.txt
+++ b/library/cpp/CMakeLists.darwin-arm64.txt
@@ -38,7 +38,6 @@ add_subdirectory(dwarf_backtrace)
add_subdirectory(enumbitset)
add_subdirectory(execprofile)
add_subdirectory(getopt)
-add_subdirectory(grpc)
add_subdirectory(histogram)
add_subdirectory(html)
add_subdirectory(http)
diff --git a/library/cpp/CMakeLists.darwin-x86_64.txt b/library/cpp/CMakeLists.darwin-x86_64.txt
index ca80e3eed3..5692463e65 100644
--- a/library/cpp/CMakeLists.darwin-x86_64.txt
+++ b/library/cpp/CMakeLists.darwin-x86_64.txt
@@ -39,7 +39,6 @@ add_subdirectory(dwarf_backtrace)
add_subdirectory(enumbitset)
add_subdirectory(execprofile)
add_subdirectory(getopt)
-add_subdirectory(grpc)
add_subdirectory(histogram)
add_subdirectory(html)
add_subdirectory(http)
diff --git a/library/cpp/CMakeLists.linux-aarch64.txt b/library/cpp/CMakeLists.linux-aarch64.txt
index b7794e3005..58a86cdbb5 100644
--- a/library/cpp/CMakeLists.linux-aarch64.txt
+++ b/library/cpp/CMakeLists.linux-aarch64.txt
@@ -38,7 +38,6 @@ add_subdirectory(dwarf_backtrace)
add_subdirectory(enumbitset)
add_subdirectory(execprofile)
add_subdirectory(getopt)
-add_subdirectory(grpc)
add_subdirectory(histogram)
add_subdirectory(html)
add_subdirectory(http)
diff --git a/library/cpp/CMakeLists.linux-x86_64.txt b/library/cpp/CMakeLists.linux-x86_64.txt
index ca80e3eed3..5692463e65 100644
--- a/library/cpp/CMakeLists.linux-x86_64.txt
+++ b/library/cpp/CMakeLists.linux-x86_64.txt
@@ -39,7 +39,6 @@ add_subdirectory(dwarf_backtrace)
add_subdirectory(enumbitset)
add_subdirectory(execprofile)
add_subdirectory(getopt)
-add_subdirectory(grpc)
add_subdirectory(histogram)
add_subdirectory(html)
add_subdirectory(http)
diff --git a/library/cpp/CMakeLists.windows-x86_64.txt b/library/cpp/CMakeLists.windows-x86_64.txt
index 772027a342..d8d81b3185 100644
--- a/library/cpp/CMakeLists.windows-x86_64.txt
+++ b/library/cpp/CMakeLists.windows-x86_64.txt
@@ -37,7 +37,6 @@ add_subdirectory(dns)
add_subdirectory(enumbitset)
add_subdirectory(execprofile)
add_subdirectory(getopt)
-add_subdirectory(grpc)
add_subdirectory(histogram)
add_subdirectory(html)
add_subdirectory(http)
diff --git a/library/cpp/grpc/CMakeLists.txt b/library/cpp/grpc/CMakeLists.txt
deleted file mode 100644
index 69f43c42a7..0000000000
--- a/library/cpp/grpc/CMakeLists.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(client)
-add_subdirectory(server)
diff --git a/library/cpp/grpc/client/CMakeLists.darwin-arm64.txt b/library/cpp/grpc/client/CMakeLists.darwin-arm64.txt
deleted file mode 100644
index db6f6bfdc1..0000000000
--- a/library/cpp/grpc/client/CMakeLists.darwin-arm64.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-grpc-client)
-target_link_libraries(cpp-grpc-client PUBLIC
- contrib-libs-cxxsupp
- yutil
- contrib-libs-grpc
- cpp-deprecated-atomic
-)
-target_sources(cpp-grpc-client PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/client/grpc_client_low.cpp
-)
diff --git a/library/cpp/grpc/client/CMakeLists.darwin-x86_64.txt b/library/cpp/grpc/client/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index db6f6bfdc1..0000000000
--- a/library/cpp/grpc/client/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-grpc-client)
-target_link_libraries(cpp-grpc-client PUBLIC
- contrib-libs-cxxsupp
- yutil
- contrib-libs-grpc
- cpp-deprecated-atomic
-)
-target_sources(cpp-grpc-client PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/client/grpc_client_low.cpp
-)
diff --git a/library/cpp/grpc/client/CMakeLists.linux-aarch64.txt b/library/cpp/grpc/client/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 80f660d62e..0000000000
--- a/library/cpp/grpc/client/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-grpc-client)
-target_link_libraries(cpp-grpc-client PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- contrib-libs-grpc
- cpp-deprecated-atomic
-)
-target_sources(cpp-grpc-client PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/client/grpc_client_low.cpp
-)
diff --git a/library/cpp/grpc/client/CMakeLists.linux-x86_64.txt b/library/cpp/grpc/client/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 80f660d62e..0000000000
--- a/library/cpp/grpc/client/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,20 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-grpc-client)
-target_link_libraries(cpp-grpc-client PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- contrib-libs-grpc
- cpp-deprecated-atomic
-)
-target_sources(cpp-grpc-client PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/client/grpc_client_low.cpp
-)
diff --git a/library/cpp/grpc/client/CMakeLists.txt b/library/cpp/grpc/client/CMakeLists.txt
deleted file mode 100644
index 2dce3a77fe..0000000000
--- a/library/cpp/grpc/client/CMakeLists.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
- include(CMakeLists.darwin-arm64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/library/cpp/grpc/client/CMakeLists.windows-x86_64.txt b/library/cpp/grpc/client/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index db6f6bfdc1..0000000000
--- a/library/cpp/grpc/client/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-grpc-client)
-target_link_libraries(cpp-grpc-client PUBLIC
- contrib-libs-cxxsupp
- yutil
- contrib-libs-grpc
- cpp-deprecated-atomic
-)
-target_sources(cpp-grpc-client PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/client/grpc_client_low.cpp
-)
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
deleted file mode 100644
index 0e8b97f256..0000000000
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ /dev/null
@@ -1,591 +0,0 @@
-#include "grpc_client_low.h"
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-#include <contrib/libs/grpc/include/grpc/support/log.h>
-
-#include <library/cpp/containers/stack_vector/stack_vec.h>
-
-#include <util/string/printf.h>
-#include <util/system/thread.h>
-#include <util/random/random.h>
-
-#if !defined(_WIN32) && !defined(_WIN64)
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#endif
-
-namespace NGrpc {
-
-void EnableGRpcTracing() {
- grpc_tracer_set_enabled("tcp", true);
- grpc_tracer_set_enabled("client_channel", true);
- grpc_tracer_set_enabled("channel", true);
- grpc_tracer_set_enabled("api", true);
- grpc_tracer_set_enabled("connectivity_state", true);
- grpc_tracer_set_enabled("handshaker", true);
- grpc_tracer_set_enabled("http", true);
- grpc_tracer_set_enabled("http2_stream_state", true);
- grpc_tracer_set_enabled("op_failure", true);
- grpc_tracer_set_enabled("timer", true);
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
-}
-
-class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
-public:
- TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
- : Idle_(idle)
- , Count_(count)
- , Interval_(interval)
- {
- grpc_socket_mutator_init(this, &VTable);
- }
-private:
- static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
- return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
- }
-
- template<typename TVal>
- bool SetOption(int fd, int level, int optname, const TVal& value) {
- return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
- }
- bool SetOption(int fd) {
- if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
- Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
- return false;
- }
-#ifdef _linux_
- if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
- return false;
- }
-#endif
- return true;
- }
- static bool Mutate(int fd, grpc_socket_mutator* mutator) {
- auto self = Cast(mutator);
- return self->SetOption(fd);
- }
- static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
- const auto* selfA = Cast(a);
- const auto* selfB = Cast(b);
- auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
- auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
- return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
- }
- static void Destroy(grpc_socket_mutator* mutator) {
- delete Cast(mutator);
- }
- static bool Mutate2(const grpc_mutate_socket_info* info, grpc_socket_mutator* mutator) {
- auto self = Cast(mutator);
- return self->SetOption(info->fd);
- }
-
- static grpc_socket_mutator_vtable VTable;
- const int Idle_;
- const int Count_;
- const int Interval_;
-};
-
-grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
- {
- &TGRpcKeepAliveSocketMutator::Mutate,
- &TGRpcKeepAliveSocketMutator::Compare,
- &TGRpcKeepAliveSocketMutator::Destroy,
- &TGRpcKeepAliveSocketMutator::Mutate2
- };
-
-TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
- : TcpKeepAliveSettings_(tcpKeepAliveSettings)
- , ExpireTime_(expireTime)
- , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
-{}
-
-void TChannelPool::GetStubsHolderLocked(
- const TString& channelId,
- const TGRpcClientConfig& config,
- std::function<void(TStubsHolder&)> cb)
-{
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
- return cb(it->second);
- }
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- {
- auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken()) {
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- auto now = Now();
- LastUsedQueue_.emplace(now, channelId);
- it->second.SetLastUseTime(now);
- return cb(it->second);
- } else {
- // This channel can't be used. Remove from pool to create new one
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- Pool_.erase(it);
- }
- }
- }
- TGRpcKeepAliveSocketMutator* mutator = nullptr;
- // will be destroyed inside grpc
- if (TcpKeepAliveSettings_.Enabled) {
- mutator = new TGRpcKeepAliveSocketMutator(
- TcpKeepAliveSettings_.Idle,
- TcpKeepAliveSettings_.Count,
- TcpKeepAliveSettings_.Interval
- );
- }
- cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
- LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
- }
-}
-
-void TChannelPool::DeleteChannel(const TString& channelId) {
- std::unique_lock writeLock(RWMutex_);
- auto poolIt = Pool_.find(channelId);
- if (poolIt != Pool_.end()) {
- EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
- Pool_.erase(poolIt);
- }
-}
-
-void TChannelPool::DeleteExpiredStubsHolders() {
- std::unique_lock writeLock(RWMutex_);
- auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
- for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
- Pool_.erase(i->second);
- }
- LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
-}
-
-void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
- auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
- auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_ABORT_UNLESS(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
- LastUsedQueue_.erase(pos);
-}
-
-static void PullEvents(grpc::CompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_client");
- while (true) {
- void* tag;
- bool ok;
-
- if (!cq->Next(&tag, &ok)) {
- break;
- }
-
- if (auto* ev = static_cast<IQueueClientEvent*>(tag)) {
- if (!ev->Execute(ok)) {
- ev->Destroy();
- }
- }
- }
-}
-
-class TGRpcClientLow::TContextImpl final
- : public std::enable_shared_from_this<TContextImpl>
- , public IQueueClientContext
-{
- friend class TGRpcClientLow;
-
- using TCallback = std::function<void()>;
- using TContextPtr = std::shared_ptr<TContextImpl>;
-
-public:
- ~TContextImpl() override {
- Y_ABORT_UNLESS(CountChildren() == 0,
- "Destructor called with non-empty children");
-
- if (Parent) {
- Parent->ForgetContext(this);
- } else if (Y_LIKELY(Owner)) {
- Owner->ForgetContext(this);
- }
- }
-
- /**
- * Helper for locking child pointer from a parent container
- */
- static TContextPtr LockChildPtr(TContextImpl* ptr) {
- if (ptr) {
- // N.B. it is safe to do as long as it's done under a mutex and
- // pointer is among valid children. When that's the case we
- // know that TContextImpl destructor has not finished yet, so
- // the object is valid. The lock() method may return nullptr
- // though, if the object is being destructed right now.
- return ptr->weak_from_this().lock();
- } else {
- return nullptr;
- }
- }
-
- void ForgetContext(TContextImpl* child) {
- std::unique_lock<std::mutex> guard(Mutex);
-
- auto removed = RemoveChild(child);
- Y_ABORT_UNLESS(removed, "Unexpected ForgetContext(%p)", child);
- }
-
- IQueueClientContextPtr CreateContext() override {
- auto self = shared_from_this();
- auto child = std::make_shared<TContextImpl>();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- AddChild(child.get());
-
- // It's now safe to initialize parent and owner
- child->Parent = std::move(self);
- child->Owner = Owner;
- child->CQ = CQ;
-
- // Propagate cancellation to a child context
- if (Cancelled.load(std::memory_order_relaxed)) {
- child->Cancelled.store(true, std::memory_order_relaxed);
- }
- }
-
- return child;
- }
-
- grpc::CompletionQueue* CompletionQueue() override {
- Y_ABORT_UNLESS(Owner, "Uninitialized context");
- return CQ;
- }
-
- bool IsCancelled() const override {
- return Cancelled.load(std::memory_order_acquire);
- }
-
- bool Cancel() override {
- TStackVec<TCallback, 1> callbacks;
- TStackVec<TContextPtr, 2> children;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (Cancelled.load(std::memory_order_relaxed)) {
- // Already cancelled in another thread
- return false;
- }
-
- callbacks.reserve(Callbacks.size());
- children.reserve(CountChildren());
-
- for (auto& callback : Callbacks) {
- callbacks.emplace_back().swap(callback);
- }
- Callbacks.clear();
-
- // Collect all children we need to cancel
- // N.B. we don't clear children links (cleared by destructors)
- // N.B. some children may be stuck in destructors at the moment
- for (TContextImpl* ptr : InlineChildren) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
- for (auto* ptr : Children) {
- if (auto child = LockChildPtr(ptr)) {
- children.emplace_back(std::move(child));
- }
- }
-
- Cancelled.store(true, std::memory_order_release);
- }
-
- // Call directly subscribed callbacks
- if (callbacks) {
- RunCallbacksNoExcept(callbacks);
- }
-
- // Cancel all children
- for (auto& child : children) {
- child->Cancel();
- child.reset();
- }
-
- return true;
- }
-
- void SubscribeCancel(TCallback callback) override {
- Y_ABORT_UNLESS(callback, "SubscribeCancel called with an empty callback");
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- if (!Cancelled.load(std::memory_order_relaxed)) {
- Callbacks.emplace_back().swap(callback);
- return;
- }
- }
-
- // Already cancelled, run immediately
- callback();
- }
-
-private:
- void AddChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (!slot) {
- slot = child;
- return;
- }
- }
-
- Children.insert(child);
- }
-
- bool RemoveChild(TContextImpl* child) {
- for (TContextImpl*& slot : InlineChildren) {
- if (slot == child) {
- slot = nullptr;
- return true;
- }
- }
-
- return Children.erase(child);
- }
-
- size_t CountChildren() {
- size_t count = 0;
-
- for (TContextImpl* ptr : InlineChildren) {
- if (ptr) {
- ++count;
- }
- }
-
- return count + Children.size();
- }
-
- template<class TCallbacks>
- static void RunCallbacksNoExcept(TCallbacks& callbacks) noexcept {
- for (auto& callback : callbacks) {
- if (callback) {
- callback();
- callback = nullptr;
- }
- }
- }
-
-private:
- // We want a simple lock here, without extra memory allocations
- std::mutex Mutex;
-
- // These fields are initialized on successful registration
- TContextPtr Parent;
- TGRpcClientLow* Owner = nullptr;
- grpc::CompletionQueue* CQ = nullptr;
-
- // Some children are stored inline, others are in a set
- std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
- std::unordered_set<TContextImpl*> Children;
-
- // Single callback is stored without extra allocations
- TStackVec<TCallback, 1> Callbacks;
-
- // Atomic flag for a faster IsCancelled() implementation
- std::atomic<bool> Cancelled;
-};
-
-TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
- : UseCompletionQueuePerThread_(useCompletionQueuePerThread)
-{
- Init(numWorkerThread);
-}
-
-void TGRpcClientLow::Init(size_t numWorkerThread) {
- SetCqState(WORKING);
- if (UseCompletionQueuePerThread_) {
- for (size_t i = 0; i < numWorkerThread; i++) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- } else {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- for (size_t i = 0; i < numWorkerThread; i++) {
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
- }
-}
-
-void TGRpcClientLow::AddWorkerThreadForTest() {
- if (UseCompletionQueuePerThread_) {
- CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- } else {
- auto* cq = CQS_.back().get();
- WorkerThreads_.emplace_back(SystemThreadFactory()->Run([cq]() {
- PullEvents(cq);
- }).Release());
- }
-}
-
-TGRpcClientLow::~TGRpcClientLow() {
- StopInternal(true);
- WaitInternal();
-}
-
-void TGRpcClientLow::Stop(bool wait) {
- StopInternal(false);
-
- if (wait) {
- WaitInternal();
- }
-}
-
-void TGRpcClientLow::StopInternal(bool silent) {
- bool shutdown;
-
- TVector<TContextImpl::TContextPtr> cancelQueue;
-
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- auto allowStateChange = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- return !silent;
- case STOP_EXPLICIT:
- return false;
- }
-
- Y_UNREACHABLE();
- };
-
- if (!allowStateChange()) {
- // Completion queue is already stopping
- return;
- }
-
- SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
-
- if (!silent && !Contexts_.empty()) {
- cancelQueue.reserve(Contexts_.size());
- for (auto* ptr : Contexts_) {
- // N.B. some contexts may be stuck in destructors
- if (auto context = TContextImpl::LockChildPtr(ptr)) {
- cancelQueue.emplace_back(std::move(context));
- }
- }
- }
-
- shutdown = Contexts_.empty();
- }
-
- for (auto& context : cancelQueue) {
- context->Cancel();
- context.reset();
- }
-
- if (shutdown) {
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
- }
-}
-
-void TGRpcClientLow::WaitInternal() {
- std::unique_lock<std::mutex> guard(JoinMutex_);
-
- for (auto& ti : WorkerThreads_) {
- ti->Join();
- }
-}
-
-void TGRpcClientLow::WaitIdle() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- while (!Contexts_.empty()) {
- ContextsEmpty_.wait(guard);
- }
-}
-
-std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- auto allowCreateContext = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return false;
- }
-
- Y_UNREACHABLE();
- };
-
- if (!allowCreateContext()) {
- // New context creation is forbidden
- return nullptr;
- }
-
- auto context = std::make_shared<TContextImpl>();
- Contexts_.insert(context.get());
- context->Owner = this;
- if (UseCompletionQueuePerThread_) {
- context->CQ = CQS_[RandomNumber(CQS_.size())].get();
- } else {
- context->CQ = CQS_[0].get();
- }
- return context;
-}
-
-void TGRpcClientLow::ForgetContext(TContextImpl* context) {
- bool shutdown = false;
-
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- if (!Contexts_.erase(context)) {
- Y_ABORT("Unexpected ForgetContext(%p)", context);
- }
-
- if (Contexts_.empty()) {
- if (IsStopping()) {
- shutdown = true;
- }
-
- ContextsEmpty_.notify_all();
- }
- }
-
- if (shutdown) {
- // This was the last context, shutdown CQ
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
- }
-}
-
-} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
deleted file mode 100644
index acec2de075..0000000000
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ /dev/null
@@ -1,1426 +0,0 @@
-#pragma once
-
-#include "grpc_common.h"
-
-#include <library/cpp/deprecated/atomic/atomic.h>
-
-#include <util/thread/factory.h>
-#include <util/string/builder.h>
-#include <grpc++/grpc++.h>
-#include <grpc++/support/async_stream.h>
-#include <grpc++/support/async_unary_call.h>
-
-#include <deque>
-#include <typeindex>
-#include <typeinfo>
-#include <variant>
-#include <vector>
-#include <unordered_map>
-#include <unordered_set>
-#include <mutex>
-#include <shared_mutex>
-
-/*
- * This file contains low level logic for grpc
- * This file should not be used in high level code without special reason
- */
-namespace NGrpc {
-
-const size_t DEFAULT_NUM_THREADS = 2;
-
-////////////////////////////////////////////////////////////////////////////////
-
-void EnableGRpcTracing();
-
-////////////////////////////////////////////////////////////////////////////////
-
-struct TTcpKeepAliveSettings {
- bool Enabled;
- size_t Idle;
- size_t Count;
- size_t Interval;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-
-// Common interface used to execute action from grpc cq routine
-class IQueueClientEvent {
-public:
- virtual ~IQueueClientEvent() = default;
-
- //! Execute an action defined by implementation
- virtual bool Execute(bool ok) = 0;
-
- //! Finish and destroy event
- virtual void Destroy() = 0;
-};
-
-// Implementation of IQueueClientEvent that reduces allocations
-template<class TSelf>
-class TQueueClientFixedEvent : private IQueueClientEvent {
- using TCallback = void (TSelf::*)(bool);
-
-public:
- TQueueClientFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
-
- IQueueClientEvent* Prepare() {
- Self->Ref();
- return this;
- }
-
-private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok);
- return false;
- }
-
- void Destroy() override {
- Self->UnRef();
- }
-
-private:
- TSelf* const Self;
- TCallback const Callback;
-};
-
-class IQueueClientContext;
-using IQueueClientContextPtr = std::shared_ptr<IQueueClientContext>;
-
-// Provider of IQueueClientContext instances
-class IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContextProvider() = default;
-
- virtual IQueueClientContextPtr CreateContext() = 0;
-};
-
-// Activity context for a low-level client
-class IQueueClientContext : public IQueueClientContextProvider {
-public:
- virtual ~IQueueClientContext() = default;
-
- //! Returns CompletionQueue associated with the client
- virtual grpc::CompletionQueue* CompletionQueue() = 0;
-
- //! Returns true if context has been cancelled
- virtual bool IsCancelled() const = 0;
-
- //! Tries to cancel context, calling all registered callbacks
- virtual bool Cancel() = 0;
-
- //! Subscribes callback to cancellation
- //
- // Note there's no way to unsubscribe, if subscription is temporary
- // make sure you create a new context with CreateContext and release
- // it as soon as it's no longer needed.
- virtual void SubscribeCancel(std::function<void()> callback) = 0;
-
- //! Subscribes callback to cancellation
- //
- // This alias is for compatibility with older code.
- void SubscribeStop(std::function<void()> callback) {
- SubscribeCancel(std::move(callback));
- }
-};
-
-// Represents grpc status and error message string
-struct TGrpcStatus {
- TString Msg;
- TString Details;
- int GRpcStatusCode;
- bool InternalError;
- std::multimap<TString, TString> ServerTrailingMetadata;
-
- TGrpcStatus()
- : GRpcStatusCode(grpc::StatusCode::OK)
- , InternalError(false)
- { }
-
- TGrpcStatus(TString msg, int statusCode, bool internalError)
- : Msg(std::move(msg))
- , GRpcStatusCode(statusCode)
- , InternalError(internalError)
- { }
-
- TGrpcStatus(grpc::StatusCode status, TString msg, TString details = {})
- : Msg(std::move(msg))
- , Details(std::move(details))
- , GRpcStatusCode(status)
- , InternalError(false)
- { }
-
- TGrpcStatus(const grpc::Status& status)
- : TGrpcStatus(status.error_code(), TString(status.error_message()), TString(status.error_details()))
- { }
-
- TGrpcStatus& operator=(const grpc::Status& status) {
- Msg = TString(status.error_message());
- Details = TString(status.error_details());
- GRpcStatusCode = status.error_code();
- InternalError = false;
- return *this;
- }
-
- static TGrpcStatus Internal(TString msg) {
- return { std::move(msg), -1, true };
- }
-
- bool Ok() const {
- return !InternalError && GRpcStatusCode == grpc::StatusCode::OK;
- }
-
- TStringBuilder ToDebugString() const {
- TStringBuilder ret;
- ret << "gRpcStatusCode: " << GRpcStatusCode;
- if(!Ok())
- ret << ", Msg: " << Msg << ", Details: " << Details << ", InternalError: " << InternalError;
- return ret;
- }
-};
-
-bool inline IsGRpcStatusGood(const TGrpcStatus& status) {
- return status.Ok();
-}
-
-// Response callback type - this callback will be called when request is finished
-// (or after getting each chunk in case of streaming mode)
-template<typename TResponse>
-using TResponseCallback = std::function<void (TGrpcStatus&&, TResponse&&)>;
-
-template<typename TResponse>
-using TAdvancedResponseCallback = std::function<void (const grpc::ClientContext&, TGrpcStatus&&, TResponse&&)>;
-
-// Call associated metadata
-struct TCallMeta {
- std::shared_ptr<grpc::CallCredentials> CallCredentials;
- std::vector<std::pair<TString, TString>> Aux;
- std::variant<TDuration, TInstant> Timeout; // timeout as duration from now or time point in future
-};
-
-class TGRpcRequestProcessorCommon {
-protected:
- void ApplyMeta(const TCallMeta& meta) {
- for (const auto& rec : meta.Aux) {
- Context.AddMetadata(rec.first, rec.second);
- }
- if (meta.CallCredentials) {
- Context.set_credentials(meta.CallCredentials);
- }
- if (const TDuration* timeout = std::get_if<TDuration>(&meta.Timeout)) {
- if (*timeout) {
- auto deadline = gpr_time_add(
- gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_micros(timeout->MicroSeconds(), GPR_TIMESPAN));
- Context.set_deadline(deadline);
- }
- } else if (const TInstant* deadline = std::get_if<TInstant>(&meta.Timeout)) {
- if (*deadline) {
- Context.set_deadline(gpr_time_from_micros(deadline->MicroSeconds(), GPR_CLOCK_MONOTONIC));
- }
- }
- }
-
- void GetInitialMetadata(std::unordered_multimap<TString, TString>* metadata) {
- for (const auto& [key, value] : Context.GetServerInitialMetadata()) {
- metadata->emplace(
- TString(key.begin(), key.end()),
- TString(value.begin(), value.end())
- );
- }
- }
-
- grpc::Status Status;
- grpc::ClientContext Context;
- std::shared_ptr<IQueueClientContext> LocalContext;
-};
-
-template<typename TStub, typename TRequest, typename TResponse>
-class TSimpleRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
-public:
- using TPtr = TIntrusivePtr<TSimpleRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
-
- explicit TSimpleRequestProcessor(TResponseCallback<TResponse>&& callback)
- : Callback_(std::move(callback))
- { }
-
- ~TSimpleRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- }
- }
-
- bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- return false;
- }
-
- void Destroy() override {
- UnRef();
- }
-
-private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
- }
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
-
- void Stop() {
- Context.TryCancel();
- }
-
- TResponseCallback<TResponse> Callback_;
- TResponse Reply_;
- std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
-
- bool Replied_ = false;
-};
-
-template<typename TStub, typename TRequest, typename TResponse>
-class TAdvancedRequestProcessor
- : public TThrRefBase
- , public IQueueClientEvent
- , public TGRpcRequestProcessorCommon {
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncResponseReader<TResponse>>;
- template<typename> friend class TServiceConnection;
-public:
- using TPtr = TIntrusivePtr<TAdvancedRequestProcessor>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*);
-
- explicit TAdvancedRequestProcessor(TAdvancedResponseCallback<TResponse>&& callback)
- : Callback_(std::move(callback))
- { }
-
- ~TAdvancedRequestProcessor() {
- if (!Replied_ && Callback_) {
- Callback_(Context, TGrpcStatus::Internal("request left unhandled"), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- }
- }
-
- bool Execute(bool ok) override {
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext.reset();
- }
- TGrpcStatus status;
- if (ok) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- Replied_ = true;
- Callback_(Context, std::move(status), std::move(Reply_));
- Callback_ = nullptr; // free resources as early as possible
- return false;
- }
-
- void Destroy() override {
- UnRef();
- }
-
-private:
- IQueueClientEvent* FinishedEvent() {
- Ref();
- return this;
- }
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, const TRequest& request, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- Replied_ = true;
- Callback_(Context, TGrpcStatus(grpc::StatusCode::CANCELLED, "Client is shutting down"), std::move(Reply_));
- Callback_ = nullptr;
- return;
- }
- {
- std::unique_lock<std::mutex> guard(Mutex_);
- LocalContext = context;
- Reader_ = (stub.*asyncRequest)(&Context, request, context->CompletionQueue());
- Reader_->Finish(&Reply_, &Status, FinishedEvent());
- }
- context->SubscribeStop([self = TPtr(this)] {
- self->Stop();
- });
- }
-
- void Stop() {
- Context.TryCancel();
- }
-
- TAdvancedResponseCallback<TResponse> Callback_;
- TResponse Reply_;
- std::mutex Mutex_;
- TAsyncReaderPtr Reader_;
-
- bool Replied_ = false;
-};
-
-class IStreamRequestCtrl : public TThrRefBase {
-public:
- using TPtr = TIntrusivePtr<IStreamRequestCtrl>;
-
- /**
- * Asynchronously cancel the request
- */
- virtual void Cancel() = 0;
-};
-
-template<class TResponse>
-class IStreamRequestReadProcessor : public IStreamRequestCtrl {
-public:
- using TPtr = TIntrusivePtr<IStreamRequestReadProcessor>;
- using TReadCallback = std::function<void(TGrpcStatus&&)>;
-
- /**
- * Scheduled initial server metadata read from the stream
- */
- virtual void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) = 0;
-
- /**
- * Scheduled response read from the stream
- * Callback will be called with the status if it failed
- * Only one Read or Finish call may be active at a time
- */
- virtual void Read(TResponse* response, TReadCallback callback) = 0;
-
- /**
- * Stop reading and gracefully finish the stream
- * Only one Read or Finish call may be active at a time
- */
- virtual void Finish(TReadCallback callback) = 0;
-
- /**
- * Additional callback to be called when stream has finished
- */
- virtual void AddFinishedCallback(TReadCallback callback) = 0;
-};
-
-template<class TRequest, class TResponse>
-class IStreamRequestReadWriteProcessor : public IStreamRequestReadProcessor<TResponse> {
-public:
- using TPtr = TIntrusivePtr<IStreamRequestReadWriteProcessor>;
- using TWriteCallback = std::function<void(TGrpcStatus&&)>;
-
- /**
- * Scheduled request write to the stream
- */
- virtual void Write(TRequest&& request, TWriteCallback callback = { }) = 0;
-};
-
-class TGRpcKeepAliveSocketMutator;
-
-// Class to hold stubs allocated on channel.
-// It is poor documented part of grpc. See KIKIMR-6109 and comment to this commit
-
-// Stub holds shared_ptr<ChannelInterface>, so we can destroy this holder even if
-// request processor using stub
-class TStubsHolder : public TNonCopyable {
- using TypeInfoRef = std::reference_wrapper<const std::type_info>;
-
- struct THasher {
- std::size_t operator()(TypeInfoRef code) const {
- return code.get().hash_code();
- }
- };
-
- struct TEqualTo {
- bool operator()(TypeInfoRef lhs, TypeInfoRef rhs) const {
- return lhs.get() == rhs.get();
- }
- };
-public:
- TStubsHolder(std::shared_ptr<grpc::ChannelInterface> channel)
- : ChannelInterface_(channel)
- {}
-
- // Returns true if channel can't be used to perform request now
- bool IsChannelBroken() const {
- auto state = ChannelInterface_->GetState(false);
- return state == GRPC_CHANNEL_SHUTDOWN ||
- state == GRPC_CHANNEL_TRANSIENT_FAILURE;
- }
-
- template<typename TStub>
- std::shared_ptr<TStub> GetOrCreateStub() {
- const auto& stubId = typeid(TStub);
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Stubs_.find(stubId);
- if (it != Stubs_.end()) {
- return std::static_pointer_cast<TStub>(it->second);
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- auto it = Stubs_.emplace(stubId, nullptr);
- if (!it.second) {
- return std::static_pointer_cast<TStub>(it.first->second);
- } else {
- it.first->second = std::make_shared<TStub>(ChannelInterface_);
- return std::static_pointer_cast<TStub>(it.first->second);
- }
- }
- }
-
- const TInstant& GetLastUseTime() const {
- return LastUsed_;
- }
-
- void SetLastUseTime(const TInstant& time) {
- LastUsed_ = time;
- }
-private:
- TInstant LastUsed_ = Now();
- std::shared_mutex RWMutex_;
- std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
- std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
-};
-
-class TChannelPool {
-public:
- TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
- //Allows to CreateStub from TStubsHolder under lock
- //The callback will be called just during GetStubsHolderLocked call
- void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
- void DeleteChannel(const TString& channelId);
- void DeleteExpiredStubsHolders();
-private:
- std::shared_mutex RWMutex_;
- std::unordered_map<TString, TStubsHolder> Pool_;
- std::multimap<TInstant, TString> LastUsedQueue_;
- TTcpKeepAliveSettings TcpKeepAliveSettings_;
- TDuration ExpireTime_;
- TDuration UpdateReUseTime_;
- void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
-};
-
-template<class TResponse>
-using TStreamReaderCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadProcessor<TResponse>::TPtr)>;
-
-template<typename TStub, typename TRequest, typename TResponse>
-class TStreamRequestReadProcessor
- : public IStreamRequestReadProcessor<TResponse>
- , public TGRpcRequestProcessorCommon {
- template<typename> friend class TServiceConnection;
-public:
- using TSelf = TStreamRequestReadProcessor;
- using TAsyncReaderPtr = std::unique_ptr<grpc::ClientAsyncReader<TResponse>>;
- using TAsyncRequest = TAsyncReaderPtr (TStub::*)(grpc::ClientContext*, const TRequest&, grpc::CompletionQueue*, void*);
- using TReaderCallback = TStreamReaderCallback<TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TBase = IStreamRequestReadProcessor<TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
-
- explicit TStreamRequestReadProcessor(TReaderCallback&& callback)
- : Callback(std::move(callback))
- {
- Y_ABORT_UNLESS(Callback, "Missing connected callback");
- }
-
- void Cancel() override {
- Context.TryCancel();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !ReadFinished) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
-
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished && !HasInitialMetadata) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- InitialMetadata = metadata;
- if (!ReadFinished) {
- Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
- }
- return;
- }
- if (!HasInitialMetadata) {
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- } else {
- GetInitialMetadata(metadata);
- }
- }
-
- callback(std::move(status));
- }
-
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
-
- callback(std::move(status));
- }
-
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- }
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_ABORT_UNLESS(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
-private:
- void Start(TStub& stub, const TRequest& request, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(Callback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, request, context->CompletionQueue(), OnStartDoneTag.Prepare());
- }
-
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
-
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
- Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
-
- if (!ok || Cancelled) {
- ReadFinished = true;
-
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
-
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
- initialMetadata = InitialMetadata;
- InitialMetadata = nullptr;
- HasInitialMetadata = true;
- }
-
- if (initialMetadata) {
- GetInitialMetadata(initialMetadata);
- }
-
- callback(std::move(status));
- }
-
- void OnStartDone(bool ok) {
- TReaderCallback callback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
- callback = std::move(Callback);
- Callback = nullptr;
- }
-
- callback({ }, typename TBase::TPtr(this));
- }
-
- void OnFinished(bool ok) {
- TGrpcStatus status;
- std::vector<TReadCallback> finishedCallbacks;
- TReaderCallback startCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
-
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
-
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
-
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (Callback) {
- Y_ABORT_UNLESS(!ReadActive);
- startCallback = std::move(Callback);
- Callback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
-
- for (auto& finishedCallback : finishedCallbacks) {
- auto statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (startCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- startCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
- status.ServerTrailingMetadata.emplace(
- TString(name.begin(), name.end()),
- TString(value.begin(), value.end()));
- }
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
-
- TReaderCallback Callback;
- TAsyncReaderPtr Stream;
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
- std::mutex Mutex;
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnStartDoneTag = { this, &TSelf::OnStartDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
-
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
- bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
-};
-
-template<class TRequest, class TResponse>
-using TStreamConnectedCallback = std::function<void(TGrpcStatus&&, typename IStreamRequestReadWriteProcessor<TRequest, TResponse>::TPtr)>;
-
-template<class TStub, class TRequest, class TResponse>
-class TStreamRequestReadWriteProcessor
- : public IStreamRequestReadWriteProcessor<TRequest, TResponse>
- , public TGRpcRequestProcessorCommon {
-public:
- using TSelf = TStreamRequestReadWriteProcessor;
- using TBase = IStreamRequestReadWriteProcessor<TRequest, TResponse>;
- using TPtr = TIntrusivePtr<TSelf>;
- using TConnectedCallback = TStreamConnectedCallback<TRequest, TResponse>;
- using TReadCallback = typename TBase::TReadCallback;
- using TWriteCallback = typename TBase::TWriteCallback;
- using TAsyncReaderWriterPtr = std::unique_ptr<grpc::ClientAsyncReaderWriter<TRequest, TResponse>>;
- using TAsyncRequest = TAsyncReaderWriterPtr (TStub::*)(grpc::ClientContext*, grpc::CompletionQueue*, void*);
-
- explicit TStreamRequestReadWriteProcessor(TConnectedCallback&& callback)
- : ConnectedCallback(std::move(callback))
- {
- Y_ABORT_UNLESS(ConnectedCallback, "Missing connected callback");
- }
-
- void Cancel() override {
- Context.TryCancel();
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Cancelled = true;
- if (Started && !(ReadFinished && WriteFinished)) {
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (ReadFinished && WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
- }
-
- void Write(TRequest&& request, TWriteCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (Cancelled || ReadFinished || WriteFinished) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- } else if (WriteActive) {
- auto& item = WriteQueue.emplace_back();
- item.Callback.swap(callback);
- item.Request.Swap(&request);
- } else {
- WriteActive = true;
- WriteCallback.swap(callback);
- Stream->Write(request, OnWriteDoneTag.Prepare());
- }
- }
-
- if (!status.Ok() && callback) {
- callback(std::move(status));
- }
- }
-
- void ReadInitialMetadata(std::unordered_multimap<TString, TString>* metadata, TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished && !HasInitialMetadata) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- InitialMetadata = metadata;
- if (!ReadFinished) {
- Stream->ReadInitialMetadata(OnReadDoneTag.Prepare());
- }
- return;
- }
- if (!HasInitialMetadata) {
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- } else {
- GetInitialMetadata(metadata);
- }
- }
-
- callback(std::move(status));
- }
-
- void Read(TResponse* message, TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- ReadCallback = std::move(callback);
- if (!ReadFinished) {
- Stream->Read(message, OnReadDoneTag.Prepare());
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- }
-
- callback(std::move(status));
- }
-
- void Finish(TReadCallback callback) override {
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(!ReadActive, "Multiple Read/Finish calls detected");
- if (!Finished) {
- ReadActive = true;
- FinishCallback = std::move(callback);
- if (!ReadFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- return;
- }
- if (FinishedOk) {
- status = Status;
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
- void AddFinishedCallback(TReadCallback callback) override {
- Y_ABORT_UNLESS(callback, "Unexpected empty callback");
-
- TGrpcStatus status;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- if (!Finished) {
- FinishedCallbacks.emplace_back().swap(callback);
- return;
- }
-
- if (FinishedOk) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
- }
-
- callback(std::move(status));
- }
-
-private:
- template<typename> friend class TServiceConnection;
-
- void Start(TStub& stub, TAsyncRequest asyncRequest, IQueueClientContextProvider* provider) {
- auto context = provider->CreateContext();
- if (!context) {
- auto callback = std::move(ConnectedCallback);
- TGrpcStatus status(grpc::StatusCode::CANCELLED, "Client is shutting down");
- callback(std::move(status), nullptr);
- return;
- }
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- LocalContext = context;
- Stream = (stub.*asyncRequest)(&Context, context->CompletionQueue(), OnConnectedTag.Prepare());
- }
-
- context->SubscribeStop([self = TPtr(this)] {
- self->Cancel();
- });
- }
-
-private:
- void OnConnected(bool ok) {
- TConnectedCallback callback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Started = true;
- if (!ok || Cancelled) {
- ReadFinished = true;
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- return;
- }
-
- callback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- }
-
- callback({ }, typename TBase::TPtr(this));
- }
-
- void OnReadDone(bool ok) {
- TGrpcStatus status;
- TReadCallback callback;
- std::unordered_multimap<TString, TString>* initialMetadata = nullptr;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(ReadActive, "Unexpected Read done callback");
- Y_ABORT_UNLESS(!ReadFinished, "Unexpected ReadFinished flag");
-
- if (!ok || Cancelled || WriteFinished) {
- ReadFinished = true;
- if (!WriteActive) {
- WriteFinished = true;
- }
- if (WriteFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- if (!ok) {
- // Keep ReadActive=true, so callback is called
- // after the call is finished with an error
- return;
- }
- }
-
- callback = std::move(ReadCallback);
- ReadCallback = nullptr;
- ReadActive = false;
- initialMetadata = InitialMetadata;
- InitialMetadata = nullptr;
- HasInitialMetadata = true;
- }
-
- if (initialMetadata) {
- GetInitialMetadata(initialMetadata);
- }
-
- callback(std::move(status));
- }
-
- void OnWriteDone(bool ok) {
- TWriteCallback okCallback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Y_ABORT_UNLESS(WriteActive, "Unexpected Write done callback");
- Y_ABORT_UNLESS(!WriteFinished, "Unexpected WriteFinished flag");
-
- if (ok) {
- okCallback.swap(WriteCallback);
- } else if (WriteCallback) {
- // Put callback back on the queue until OnFinished
- auto& item = WriteQueue.emplace_front();
- item.Callback.swap(WriteCallback);
- }
-
- if (!ok || Cancelled) {
- WriteActive = false;
- WriteFinished = true;
- if (!ReadActive) {
- ReadFinished = true;
- }
- if (ReadFinished) {
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- } else if (!WriteQueue.empty()) {
- WriteCallback.swap(WriteQueue.front().Callback);
- Stream->Write(WriteQueue.front().Request, OnWriteDoneTag.Prepare());
- WriteQueue.pop_front();
- } else {
- WriteActive = false;
- if (ReadFinished) {
- WriteFinished = true;
- Stream->Finish(&Status, OnFinishedTag.Prepare());
- }
- }
- }
-
- if (okCallback) {
- okCallback(TGrpcStatus());
- }
- }
-
- void OnFinished(bool ok) {
- TGrpcStatus status;
- std::deque<TWriteItem> writesDropped;
- std::vector<TReadCallback> finishedCallbacks;
- TConnectedCallback connectedCallback;
- TReadCallback readCallback;
- TReadCallback finishCallback;
-
- {
- std::unique_lock<std::mutex> guard(Mutex);
- Finished = true;
- FinishedOk = ok;
- LocalContext.reset();
-
- if (ok) {
- status = Status;
- } else if (Cancelled) {
- status = TGrpcStatus(grpc::StatusCode::CANCELLED, "Stream cancelled");
- } else {
- status = TGrpcStatus::Internal("Unexpected error");
- }
-
- writesDropped.swap(WriteQueue);
- finishedCallbacks.swap(FinishedCallbacks);
-
- if (ConnectedCallback) {
- Y_ABORT_UNLESS(!ReadActive);
- connectedCallback = std::move(ConnectedCallback);
- ConnectedCallback = nullptr;
- } else if (ReadActive) {
- if (ReadCallback) {
- readCallback = std::move(ReadCallback);
- ReadCallback = nullptr;
- } else {
- finishCallback = std::move(FinishCallback);
- FinishCallback = nullptr;
- }
- ReadActive = false;
- }
- }
-
- for (auto& item : writesDropped) {
- if (item.Callback) {
- TGrpcStatus writeStatus = status;
- if (writeStatus.Ok()) {
- writeStatus = TGrpcStatus(grpc::StatusCode::CANCELLED, "Write request dropped");
- }
- item.Callback(std::move(writeStatus));
- }
- }
-
- for (auto& finishedCallback : finishedCallbacks) {
- TGrpcStatus statusCopy = status;
- finishedCallback(std::move(statusCopy));
- }
-
- if (connectedCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::UNKNOWN, "Unknown stream failure");
- }
- connectedCallback(std::move(status), nullptr);
- } else if (readCallback) {
- if (status.Ok()) {
- status = TGrpcStatus(grpc::StatusCode::OUT_OF_RANGE, "Read EOF");
- for (const auto& [name, value] : Context.GetServerTrailingMetadata()) {
- status.ServerTrailingMetadata.emplace(
- TString(name.begin(), name.end()),
- TString(value.begin(), value.end()));
- }
- }
- readCallback(std::move(status));
- } else if (finishCallback) {
- finishCallback(std::move(status));
- }
- }
-
-private:
- struct TWriteItem {
- TWriteCallback Callback;
- TRequest Request;
- };
-
-private:
- using TFixedEvent = TQueueClientFixedEvent<TSelf>;
-
- TFixedEvent OnConnectedTag = { this, &TSelf::OnConnected };
- TFixedEvent OnReadDoneTag = { this, &TSelf::OnReadDone };
- TFixedEvent OnWriteDoneTag = { this, &TSelf::OnWriteDone };
- TFixedEvent OnFinishedTag = { this, &TSelf::OnFinished };
-
-private:
- std::mutex Mutex;
- TAsyncReaderWriterPtr Stream;
- TConnectedCallback ConnectedCallback;
- TReadCallback ReadCallback;
- TReadCallback FinishCallback;
- std::vector<TReadCallback> FinishedCallbacks;
- std::deque<TWriteItem> WriteQueue;
- TWriteCallback WriteCallback;
- std::unordered_multimap<TString, TString>* InitialMetadata = nullptr;
- bool Started = false;
- bool HasInitialMetadata = false;
- bool ReadActive = false;
- bool ReadFinished = false;
- bool WriteActive = false;
- bool WriteFinished = false;
- bool Finished = false;
- bool Cancelled = false;
- bool FinishedOk = false;
-};
-
-class TGRpcClientLow;
-
-template<typename TGRpcService>
-class TServiceConnection {
- using TStub = typename TGRpcService::Stub;
- friend class TGRpcClientLow;
-
-public:
- /*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoRequest(const TRequest& request,
- TResponseCallback<TResponse> callback,
- typename TSimpleRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TSimpleRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
-
- /*
- * Start simple request
- */
- template<typename TRequest, typename TResponse>
- void DoAdvancedRequest(const TRequest& request,
- TAdvancedResponseCallback<TResponse> callback,
- typename TAdvancedRequestProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TAdvancedRequestProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, asyncRequest, request, provider ? provider : Provider_);
- }
-
- /*
- * Start bidirectional streamming
- */
- template<typename TRequest, typename TResponse>
- void DoStreamRequest(TStreamConnectedCallback<TRequest, TResponse> callback,
- typename TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TStreamRequestReadWriteProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, std::move(asyncRequest), provider ? provider : Provider_);
- }
-
- /*
- * Start streaming response reading (one request, many responses)
- */
- template<typename TRequest, typename TResponse>
- void DoStreamRequest(const TRequest& request,
- TStreamReaderCallback<TResponse> callback,
- typename TStreamRequestReadProcessor<TStub, TRequest, TResponse>::TAsyncRequest asyncRequest,
- const TCallMeta& metas = { },
- IQueueClientContextProvider* provider = nullptr)
- {
- auto processor = MakeIntrusive<TStreamRequestReadProcessor<TStub, TRequest, TResponse>>(std::move(callback));
- processor->ApplyMeta(metas);
- processor->Start(*Stub_, request, std::move(asyncRequest), provider ? provider : Provider_);
- }
-
-private:
- TServiceConnection(std::shared_ptr<grpc::ChannelInterface> ci,
- IQueueClientContextProvider* provider)
- : Stub_(TGRpcService::NewStub(ci))
- , Provider_(provider)
- {
- Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
- }
-
- TServiceConnection(TStubsHolder& holder,
- IQueueClientContextProvider* provider)
- : Stub_(holder.GetOrCreateStub<TStub>())
- , Provider_(provider)
- {
- Y_ABORT_UNLESS(Provider_, "Connection does not have a queue provider");
- }
-
- std::shared_ptr<TStub> Stub_;
- IQueueClientContextProvider* Provider_;
-};
-
-class TGRpcClientLow
- : public IQueueClientContextProvider
-{
- class TContextImpl;
- friend class TContextImpl;
-
- enum ECqState : TAtomicBase {
- WORKING = 0,
- STOP_SILENT = 1,
- STOP_EXPLICIT = 2,
- };
-
-public:
- explicit TGRpcClientLow(size_t numWorkerThread = DEFAULT_NUM_THREADS, bool useCompletionQueuePerThread = false);
- ~TGRpcClientLow();
-
- // Tries to stop all currently running requests (via their stop callbacks)
- // Will shutdown CQ and drain events once all requests have finished
- // No new requests may be started after this call
- void Stop(bool wait = false);
-
- // Waits until all currently running requests finish execution
- void WaitIdle();
-
- inline bool IsStopping() const {
- switch (GetCqState()) {
- case WORKING:
- return false;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return true;
- }
-
- Y_UNREACHABLE();
- }
-
- IQueueClientContextPtr CreateContext() override;
-
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(const TGRpcClientConfig& config) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(CreateChannelInterface(config), this));
- }
-
- template<typename TGRpcService>
- std::unique_ptr<TServiceConnection<TGRpcService>> CreateGRpcServiceConnection(TStubsHolder& holder) {
- return std::unique_ptr<TServiceConnection<TGRpcService>>(new TServiceConnection<TGRpcService>(holder, this));
- }
-
- // Tests only, not thread-safe
- void AddWorkerThreadForTest();
-
-private:
- using IThreadRef = std::unique_ptr<IThreadFactory::IThread>;
- using CompletionQueueRef = std::unique_ptr<grpc::CompletionQueue>;
- void Init(size_t numWorkerThread);
-
- inline ECqState GetCqState() const { return (ECqState) AtomicGet(CqState_); }
- inline void SetCqState(ECqState state) { AtomicSet(CqState_, state); }
-
- void StopInternal(bool silent);
- void WaitInternal();
-
- void ForgetContext(TContextImpl* context);
-
-private:
- bool UseCompletionQueuePerThread_;
- std::vector<CompletionQueueRef> CQS_;
- std::vector<IThreadRef> WorkerThreads_;
- TAtomic CqState_ = -1;
-
- std::mutex Mtx_;
- std::condition_variable ContextsEmpty_;
- std::unordered_set<TContextImpl*> Contexts_;
-
- std::mutex JoinMutex_;
-};
-
-} // namespace NGRpc
diff --git a/library/cpp/grpc/client/grpc_common.h b/library/cpp/grpc/client/grpc_common.h
deleted file mode 100644
index d19e42d4ac..0000000000
--- a/library/cpp/grpc/client/grpc_common.h
+++ /dev/null
@@ -1,84 +0,0 @@
-#pragma once
-
-#include <grpc++/grpc++.h>
-#include <grpc++/resource_quota.h>
-
-#include <util/datetime/base.h>
-#include <unordered_map>
-#include <util/generic/string.h>
-
-constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-
-namespace NGrpc {
-
-struct TGRpcClientConfig {
- TString Locator; // format host:port
- TDuration Timeout = TDuration::Max(); // request timeout
- ui64 MaxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT; // Max request and response size
- ui64 MaxInboundMessageSize = 0; // overrides MaxMessageSize for incoming requests
- ui64 MaxOutboundMessageSize = 0; // overrides MaxMessageSize for outgoing requests
- ui32 MaxInFlight = 0;
- bool EnableSsl = false;
- grpc::SslCredentialsOptions SslCredentials;
- grpc_compression_algorithm CompressionAlgoritm = GRPC_COMPRESS_NONE;
- ui64 MemQuota = 0;
- std::unordered_map<TString, TString> StringChannelParams;
- std::unordered_map<TString, int> IntChannelParams;
- TString LoadBalancingPolicy = { };
- TString SslTargetNameOverride = { };
-
- TGRpcClientConfig() = default;
- TGRpcClientConfig(const TGRpcClientConfig&) = default;
- TGRpcClientConfig(TGRpcClientConfig&&) = default;
- TGRpcClientConfig& operator=(const TGRpcClientConfig&) = default;
- TGRpcClientConfig& operator=(TGRpcClientConfig&&) = default;
-
- TGRpcClientConfig(const TString& locator, TDuration timeout = TDuration::Max(),
- ui64 maxMessageSize = DEFAULT_GRPC_MESSAGE_SIZE_LIMIT, ui32 maxInFlight = 0, const TString& caCert = "", const TString& clientCert = "",
- const TString& clientPrivateKey = "", grpc_compression_algorithm compressionAlgorithm = GRPC_COMPRESS_NONE, bool enableSsl = false)
- : Locator(locator)
- , Timeout(timeout)
- , MaxMessageSize(maxMessageSize)
- , MaxInFlight(maxInFlight)
- , EnableSsl(enableSsl)
- , SslCredentials{.pem_root_certs = caCert, .pem_private_key = clientPrivateKey, .pem_cert_chain = clientCert}
- , CompressionAlgoritm(compressionAlgorithm)
- {}
-};
-
-inline std::shared_ptr<grpc::ChannelInterface> CreateChannelInterface(const TGRpcClientConfig& config, grpc_socket_mutator* mutator = nullptr){
- grpc::ChannelArguments args;
- args.SetMaxReceiveMessageSize(config.MaxInboundMessageSize ? config.MaxInboundMessageSize : config.MaxMessageSize);
- args.SetMaxSendMessageSize(config.MaxOutboundMessageSize ? config.MaxOutboundMessageSize : config.MaxMessageSize);
- args.SetCompressionAlgorithm(config.CompressionAlgoritm);
-
- for (const auto& kvp: config.StringChannelParams) {
- args.SetString(kvp.first, kvp.second);
- }
-
- for (const auto& kvp: config.IntChannelParams) {
- args.SetInt(kvp.first, kvp.second);
- }
-
- if (config.MemQuota) {
- grpc::ResourceQuota quota;
- quota.Resize(config.MemQuota);
- args.SetResourceQuota(quota);
- }
- if (mutator) {
- args.SetSocketMutator(mutator);
- }
- if (!config.LoadBalancingPolicy.empty()) {
- args.SetLoadBalancingPolicyName(config.LoadBalancingPolicy);
- }
- if (!config.SslTargetNameOverride.empty()) {
- args.SetSslTargetNameOverride(config.SslTargetNameOverride);
- }
- if (config.EnableSsl || config.SslCredentials.pem_root_certs) {
- return grpc::CreateCustomChannel(config.Locator, grpc::SslCredentials(config.SslCredentials), args);
- } else {
- return grpc::CreateCustomChannel(config.Locator, grpc::InsecureChannelCredentials(), args);
- }
-}
-
-} // namespace NGRpc
diff --git a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp b/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp
deleted file mode 100644
index b8af2a518f..0000000000
--- a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp
+++ /dev/null
@@ -1,61 +0,0 @@
-#include <library/cpp/grpc/client/grpc_client_low.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NGrpc;
-
-class TTestStub {
-public:
- std::shared_ptr<grpc::ChannelInterface> ChannelInterface;
- TTestStub(std::shared_ptr<grpc::ChannelInterface> channelInterface)
- : ChannelInterface(channelInterface)
- {}
-};
-
-Y_UNIT_TEST_SUITE(ChannelPoolTests) {
- Y_UNIT_TEST(UnusedStubsHoldersDeletion) {
- TGRpcClientConfig clientConfig("invalid_host:invalid_port");
- TTcpKeepAliveSettings tcpKeepAliveSettings =
- {
- true,
- 30, // NYdb::TCP_KEEPALIVE_IDLE, unused in UT, but is necessary in constructor
- 5, // NYdb::TCP_KEEPALIVE_COUNT, unused in UT, but is necessary in constructor
- 10 // NYdb::TCP_KEEPALIVE_INTERVAL, unused in UT, but is necessary in constructor
- };
- auto channelPool = TChannelPool(tcpKeepAliveSettings, TDuration::MilliSeconds(250));
- std::vector<std::weak_ptr<grpc::ChannelInterface>> ChannelInterfacesWeak;
-
- {
- std::vector<std::shared_ptr<TTestStub>> stubsHoldersShared;
- auto storeStubsHolders = [&](TStubsHolder& stubsHolder) {
- stubsHoldersShared.emplace_back(stubsHolder.GetOrCreateStub<TTestStub>());
- ChannelInterfacesWeak.emplace_back((*stubsHoldersShared.rbegin())->ChannelInterface);
- return;
- };
- for (int i = 0; i < 10; ++i) {
- channelPool.GetStubsHolderLocked(
- ToString(i),
- clientConfig,
- storeStubsHolders
- );
- }
- }
-
- auto now = Now();
- while (Now() < now + TDuration::MilliSeconds(500)){
- Sleep(TDuration::MilliSeconds(100));
- }
-
- channelPool.DeleteExpiredStubsHolders();
-
- bool allDeleted = true;
- for (auto i = ChannelInterfacesWeak.begin(); i != ChannelInterfacesWeak.end(); ++i) {
- allDeleted = allDeleted && i->expired();
- }
-
- // assertion is made for channel interfaces instead of stubs, because after stub deletion
- // TStubsHolder has the only shared_ptr for channel interface.
- UNIT_ASSERT_C(allDeleted, "expired stubsHolders were not deleted after timeout");
-
- }
-} // ChannelPoolTests ut suite \ No newline at end of file
diff --git a/library/cpp/grpc/client/ut/ya.make b/library/cpp/grpc/client/ut/ya.make
deleted file mode 100644
index 45a88622f4..0000000000
--- a/library/cpp/grpc/client/ut/ya.make
+++ /dev/null
@@ -1,7 +0,0 @@
-UNITTEST_FOR(library/cpp/grpc/client)
-
-SRCS(
- grpc_client_low_ut.cpp
-)
-
-END()
diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make
deleted file mode 100644
index a963a42abe..0000000000
--- a/library/cpp/grpc/client/ya.make
+++ /dev/null
@@ -1,16 +0,0 @@
-LIBRARY()
-
-SRCS(
- grpc_client_low.cpp
-)
-
-PEERDIR(
- contrib/libs/grpc
- library/cpp/deprecated/atomic
-)
-
-END()
-
-RECURSE_FOR_TESTS(
- ut
-)
diff --git a/library/cpp/grpc/server/CMakeLists.darwin-arm64.txt b/library/cpp/grpc/server/CMakeLists.darwin-arm64.txt
deleted file mode 100644
index 1db6667068..0000000000
--- a/library/cpp/grpc/server/CMakeLists.darwin-arm64.txt
+++ /dev/null
@@ -1,35 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(actors)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-
-add_library(cpp-grpc-server)
-target_link_libraries(cpp-grpc-server PUBLIC
- contrib-libs-cxxsupp
- yutil
- tools-enum_parser-enum_serialization_runtime
- contrib-libs-grpc
- monlib-dynamic_counters-percentile
-)
-target_sources(cpp-grpc-server PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/event_callback.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_server.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_counters.cpp
-)
-generate_enum_serilization(cpp-grpc-server
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request_base.h
- INCLUDE_HEADERS
- library/cpp/grpc/server/grpc_request_base.h
-)
diff --git a/library/cpp/grpc/server/CMakeLists.darwin-x86_64.txt b/library/cpp/grpc/server/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index 1db6667068..0000000000
--- a/library/cpp/grpc/server/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,35 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(actors)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-
-add_library(cpp-grpc-server)
-target_link_libraries(cpp-grpc-server PUBLIC
- contrib-libs-cxxsupp
- yutil
- tools-enum_parser-enum_serialization_runtime
- contrib-libs-grpc
- monlib-dynamic_counters-percentile
-)
-target_sources(cpp-grpc-server PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/event_callback.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_server.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_counters.cpp
-)
-generate_enum_serilization(cpp-grpc-server
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request_base.h
- INCLUDE_HEADERS
- library/cpp/grpc/server/grpc_request_base.h
-)
diff --git a/library/cpp/grpc/server/CMakeLists.linux-aarch64.txt b/library/cpp/grpc/server/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index afeac13dc1..0000000000
--- a/library/cpp/grpc/server/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(actors)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-
-add_library(cpp-grpc-server)
-target_link_libraries(cpp-grpc-server PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- tools-enum_parser-enum_serialization_runtime
- contrib-libs-grpc
- monlib-dynamic_counters-percentile
-)
-target_sources(cpp-grpc-server PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/event_callback.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_server.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_counters.cpp
-)
-generate_enum_serilization(cpp-grpc-server
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request_base.h
- INCLUDE_HEADERS
- library/cpp/grpc/server/grpc_request_base.h
-)
diff --git a/library/cpp/grpc/server/CMakeLists.linux-x86_64.txt b/library/cpp/grpc/server/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index afeac13dc1..0000000000
--- a/library/cpp/grpc/server/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,36 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(actors)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-
-add_library(cpp-grpc-server)
-target_link_libraries(cpp-grpc-server PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- tools-enum_parser-enum_serialization_runtime
- contrib-libs-grpc
- monlib-dynamic_counters-percentile
-)
-target_sources(cpp-grpc-server PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/event_callback.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_server.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_counters.cpp
-)
-generate_enum_serilization(cpp-grpc-server
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request_base.h
- INCLUDE_HEADERS
- library/cpp/grpc/server/grpc_request_base.h
-)
diff --git a/library/cpp/grpc/server/CMakeLists.txt b/library/cpp/grpc/server/CMakeLists.txt
deleted file mode 100644
index 2dce3a77fe..0000000000
--- a/library/cpp/grpc/server/CMakeLists.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
- include(CMakeLists.darwin-arm64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/library/cpp/grpc/server/CMakeLists.windows-x86_64.txt b/library/cpp/grpc/server/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 1db6667068..0000000000
--- a/library/cpp/grpc/server/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,35 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-add_subdirectory(actors)
-get_built_tool_path(
- TOOL_enum_parser_bin
- TOOL_enum_parser_dependency
- tools/enum_parser/enum_parser
- enum_parser
-)
-
-add_library(cpp-grpc-server)
-target_link_libraries(cpp-grpc-server PUBLIC
- contrib-libs-cxxsupp
- yutil
- tools-enum_parser-enum_serialization_runtime
- contrib-libs-grpc
- monlib-dynamic_counters-percentile
-)
-target_sources(cpp-grpc-server PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/event_callback.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_server.cpp
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_counters.cpp
-)
-generate_enum_serilization(cpp-grpc-server
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/grpc_request_base.h
- INCLUDE_HEADERS
- library/cpp/grpc/server/grpc_request_base.h
-)
diff --git a/library/cpp/grpc/server/actors/CMakeLists.darwin-arm64.txt b/library/cpp/grpc/server/actors/CMakeLists.darwin-arm64.txt
deleted file mode 100644
index 1e710ad49a..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.darwin-arm64.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(grpc-server-actors)
-target_link_libraries(grpc-server-actors PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
-)
-target_sources(grpc-server-actors PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/actors/logger.cpp
-)
diff --git a/library/cpp/grpc/server/actors/CMakeLists.darwin-x86_64.txt b/library/cpp/grpc/server/actors/CMakeLists.darwin-x86_64.txt
deleted file mode 100644
index 1e710ad49a..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.darwin-x86_64.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(grpc-server-actors)
-target_link_libraries(grpc-server-actors PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
-)
-target_sources(grpc-server-actors PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/actors/logger.cpp
-)
diff --git a/library/cpp/grpc/server/actors/CMakeLists.linux-aarch64.txt b/library/cpp/grpc/server/actors/CMakeLists.linux-aarch64.txt
deleted file mode 100644
index 0d8763b93d..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.linux-aarch64.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(grpc-server-actors)
-target_link_libraries(grpc-server-actors PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
-)
-target_sources(grpc-server-actors PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/actors/logger.cpp
-)
diff --git a/library/cpp/grpc/server/actors/CMakeLists.linux-x86_64.txt b/library/cpp/grpc/server/actors/CMakeLists.linux-x86_64.txt
deleted file mode 100644
index 0d8763b93d..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.linux-x86_64.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(grpc-server-actors)
-target_link_libraries(grpc-server-actors PUBLIC
- contrib-libs-linux-headers
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
-)
-target_sources(grpc-server-actors PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/actors/logger.cpp
-)
diff --git a/library/cpp/grpc/server/actors/CMakeLists.txt b/library/cpp/grpc/server/actors/CMakeLists.txt
deleted file mode 100644
index 2dce3a77fe..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.txt
+++ /dev/null
@@ -1,19 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
- include(CMakeLists.darwin-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64")
- include(CMakeLists.darwin-arm64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
- include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
- include(CMakeLists.linux-x86_64.txt)
-endif()
diff --git a/library/cpp/grpc/server/actors/CMakeLists.windows-x86_64.txt b/library/cpp/grpc/server/actors/CMakeLists.windows-x86_64.txt
deleted file mode 100644
index 1e710ad49a..0000000000
--- a/library/cpp/grpc/server/actors/CMakeLists.windows-x86_64.txt
+++ /dev/null
@@ -1,18 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(grpc-server-actors)
-target_link_libraries(grpc-server-actors PUBLIC
- contrib-libs-cxxsupp
- yutil
- cpp-actors-core
-)
-target_sources(grpc-server-actors PRIVATE
- ${CMAKE_SOURCE_DIR}/library/cpp/grpc/server/actors/logger.cpp
-)
diff --git a/library/cpp/grpc/server/actors/logger.cpp b/library/cpp/grpc/server/actors/logger.cpp
deleted file mode 100644
index 5233c0cb9a..0000000000
--- a/library/cpp/grpc/server/actors/logger.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-#include "logger.h"
-
-namespace NGrpc {
-namespace {
-
-static_assert(
- ui16(TLOG_EMERG) == ui16(NActors::NLog::PRI_EMERG) &&
- ui16(TLOG_DEBUG) == ui16(NActors::NLog::PRI_DEBUG),
- "log levels in the library/log and library/cpp/actors don't match");
-
-class TActorSystemLogger final: public TLogger {
-public:
- TActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) noexcept
- : ActorSystem_{as}
- , Component_{component}
- {
- }
-
-protected:
- bool DoIsEnabled(ELogPriority p) const noexcept override {
- const auto* settings = static_cast<::NActors::NLog::TSettings*>(ActorSystem_.LoggerSettings());
- const auto priority = static_cast<::NActors::NLog::EPriority>(p);
-
- return settings && settings->Satisfies(priority, Component_, 0);
- }
-
- void DoWrite(ELogPriority p, const char* format, va_list args) noexcept override {
- Y_DEBUG_ABORT_UNLESS(DoIsEnabled(p));
-
- const auto priority = static_cast<::NActors::NLog::EPriority>(p);
- ::NActors::MemLogAdapter(ActorSystem_, priority, Component_, format, args);
- }
-
-private:
- NActors::TActorSystem& ActorSystem_;
- NActors::NLog::EComponent Component_;
-};
-
-} // namespace
-
-TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component) {
- return MakeIntrusive<TActorSystemLogger>(as, component);
-}
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/actors/logger.h b/library/cpp/grpc/server/actors/logger.h
deleted file mode 100644
index abf9270f7b..0000000000
--- a/library/cpp/grpc/server/actors/logger.h
+++ /dev/null
@@ -1,11 +0,0 @@
-#pragma once
-
-#include <library/cpp/actors/core/actorsystem.h>
-#include <library/cpp/actors/core/log.h>
-#include <library/cpp/grpc/server/logger.h>
-
-namespace NGrpc {
-
-TLoggerPtr CreateActorSystemLogger(NActors::TActorSystem& as, NActors::NLog::EComponent component);
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/actors/ya.make b/library/cpp/grpc/server/actors/ya.make
deleted file mode 100644
index a8d4b40373..0000000000
--- a/library/cpp/grpc/server/actors/ya.make
+++ /dev/null
@@ -1,11 +0,0 @@
-LIBRARY()
-
-SRCS(
- logger.cpp
-)
-
-PEERDIR(
- library/cpp/actors/core
-)
-
-END()
diff --git a/library/cpp/grpc/server/event_callback.cpp b/library/cpp/grpc/server/event_callback.cpp
deleted file mode 100644
index f423836bd6..0000000000
--- a/library/cpp/grpc/server/event_callback.cpp
+++ /dev/null
@@ -1 +0,0 @@
-#include "event_callback.h"
diff --git a/library/cpp/grpc/server/event_callback.h b/library/cpp/grpc/server/event_callback.h
deleted file mode 100644
index d0b700b3c9..0000000000
--- a/library/cpp/grpc/server/event_callback.h
+++ /dev/null
@@ -1,80 +0,0 @@
-#pragma once
-
-#include "grpc_server.h"
-
-namespace NGrpc {
-
-enum class EQueueEventStatus {
- OK,
- ERROR
-};
-
-template<class TCallback>
-class TQueueEventCallback: public IQueueEvent {
-public:
- TQueueEventCallback(const TCallback& callback)
- : Callback(callback)
- {}
-
- TQueueEventCallback(TCallback&& callback)
- : Callback(std::move(callback))
- {}
-
- bool Execute(bool ok) override {
- Callback(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
- return false;
- }
-
- void DestroyRequest() override {
- delete this;
- }
-
-private:
- TCallback Callback;
-};
-
-// Implementation of IQueueEvent that reduces allocations
-template<class TSelf>
-class TQueueFixedEvent: private IQueueEvent {
- using TCallback = void (TSelf::*)(EQueueEventStatus);
-
-public:
- TQueueFixedEvent(TSelf* self, TCallback callback)
- : Self(self)
- , Callback(callback)
- { }
-
- IQueueEvent* Prepare() {
- Self->Ref();
- return this;
- }
-
-private:
- bool Execute(bool ok) override {
- ((*Self).*Callback)(ok ? EQueueEventStatus::OK : EQueueEventStatus::ERROR);
- return false;
- }
-
- void DestroyRequest() override {
- Self->UnRef();
- }
-
-private:
- TSelf* const Self;
- TCallback const Callback;
-};
-
-template<class TCallback>
-inline IQueueEvent* MakeQueueEventCallback(TCallback&& callback) {
- return new TQueueEventCallback<TCallback>(std::forward<TCallback>(callback));
-}
-
-template<class T>
-inline IQueueEvent* MakeQueueEventCallback(T* self, void (T::*method)(EQueueEventStatus)) {
- using TPtr = TIntrusivePtr<T>;
- return MakeQueueEventCallback([self = TPtr(self), method] (EQueueEventStatus status) {
- ((*self).*method)(status);
- });
-}
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_async_ctx_base.h b/library/cpp/grpc/server/grpc_async_ctx_base.h
deleted file mode 100644
index d2461b8b0d..0000000000
--- a/library/cpp/grpc/server/grpc_async_ctx_base.h
+++ /dev/null
@@ -1,109 +0,0 @@
-#pragma once
-
-#include "grpc_server.h"
-
-#include <library/cpp/string_utils/quote/quote.h>
-
-#include <util/generic/vector.h>
-#include <util/generic/string.h>
-#include <util/system/yassert.h>
-#include <util/generic/set.h>
-
-#include <grpc++/server.h>
-#include <grpc++/server_context.h>
-
-#include <chrono>
-
-namespace NGrpc {
-
-template<typename TService>
-class TBaseAsyncContext: public ICancelableContext {
-public:
- TBaseAsyncContext(typename TService::TCurrentGRpcService::AsyncService* service, grpc::ServerCompletionQueue* cq)
- : Service(service)
- , CQ(cq)
- {
- }
-
- TString GetPeerName() const {
- // Decode URL-encoded square brackets
- auto ip = Context.peer();
- CGIUnescape(ip);
- return ip;
- }
-
- TInstant Deadline() const {
- // The timeout transferred in "grpc-timeout" header [1] and calculated from the deadline
- // right before the request is getting to be send.
- // 1. https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
- //
- // After this timeout calculated back to the deadline on the server side
- // using server grpc GPR_CLOCK_MONOTONIC time (raw_deadline() method).
- // deadline() method convert this to epoch related deadline GPR_CLOCK_REALTIME
- //
-
- std::chrono::system_clock::time_point t = Context.deadline();
- if (t == std::chrono::system_clock::time_point::max()) {
- return TInstant::Max();
- }
- auto us = std::chrono::time_point_cast<std::chrono::microseconds>(t);
- return TInstant::MicroSeconds(us.time_since_epoch().count());
- }
-
- TSet<TStringBuf> GetPeerMetaKeys() const {
- TSet<TStringBuf> keys;
- for (const auto& [key, _]: Context.client_metadata()) {
- keys.emplace(key.data(), key.size());
- }
- return keys;
- }
-
- TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const {
- const auto& clientMetadata = Context.client_metadata();
- const auto range = clientMetadata.equal_range(grpc::string_ref{key.data(), key.size()});
- if (range.first == range.second) {
- return {};
- }
-
- TVector<TStringBuf> values;
- values.reserve(std::distance(range.first, range.second));
-
- for (auto it = range.first; it != range.second; ++it) {
- values.emplace_back(it->second.data(), it->second.size());
- }
- return values;
- }
-
- TVector<TStringBuf> FindClientCert() const {
- auto authContext = Context.auth_context();
-
- TVector<TStringBuf> values;
- for (auto& value: authContext->FindPropertyValues(GRPC_X509_PEM_CERT_PROPERTY_NAME)) {
- values.emplace_back(value.data(), value.size());
- }
- return values;
- }
-
- grpc_compression_level GetCompressionLevel() const {
- return Context.compression_level();
- }
-
- void Shutdown() override {
- // Shutdown may only be called after request has started successfully
- if (Context.c_call())
- Context.TryCancel();
- }
-
-protected:
- //! The means of communication with the gRPC runtime for an asynchronous
- //! server.
- typename TService::TCurrentGRpcService::AsyncService* const Service;
- //! The producer-consumer queue where for asynchronous server notifications.
- grpc::ServerCompletionQueue* const CQ;
- //! Context for the rpc, allowing to tweak aspects of it such as the use
- //! of compression, authentication, as well as to send metadata back to the
- //! client.
- grpc::ServerContext Context;
-};
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_counters.cpp b/library/cpp/grpc/server/grpc_counters.cpp
deleted file mode 100644
index fa96e0100b..0000000000
--- a/library/cpp/grpc/server/grpc_counters.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-#include "grpc_counters.h"
-
-namespace NGrpc {
-namespace {
-
-class TFakeCounterBlock final: public ICounterBlock {
-private:
- void CountNotOkRequest() override {
- }
-
- void CountNotOkResponse() override {
- }
-
- void CountNotAuthenticated() override {
- }
-
- void CountResourceExhausted() override {
- }
-
- void CountRequestBytes(ui32 /*requestSize*/) override {
- }
-
- void CountResponseBytes(ui32 /*responseSize*/) override {
- }
-
- void StartProcessing(ui32 /*requestSize*/) override {
- }
-
- void FinishProcessing(
- ui32 /*requestSize*/,
- ui32 /*responseSize*/,
- bool /*ok*/,
- ui32 /*status*/,
- TDuration /*requestDuration*/) override
- {
- }
-};
-
-} // namespace
-
-ICounterBlockPtr FakeCounterBlock() {
- return MakeIntrusive<TFakeCounterBlock>();
-}
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_counters.h b/library/cpp/grpc/server/grpc_counters.h
deleted file mode 100644
index 0b6c36c84c..0000000000
--- a/library/cpp/grpc/server/grpc_counters.h
+++ /dev/null
@@ -1,136 +0,0 @@
-#pragma once
-
-#include <library/cpp/monlib/dynamic_counters/percentile/percentile.h>
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <util/generic/ptr.h>
-
-namespace NGrpc {
-
-struct ICounterBlock : public TThrRefBase {
- virtual void CountNotOkRequest() = 0;
- virtual void CountNotOkResponse() = 0;
- virtual void CountNotAuthenticated() = 0;
- virtual void CountResourceExhausted() = 0;
- virtual void CountRequestBytes(ui32 requestSize) = 0;
- virtual void CountResponseBytes(ui32 responseSize) = 0;
- virtual void StartProcessing(ui32 requestSize) = 0;
- virtual void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status, TDuration requestDuration) = 0;
- virtual void CountRequestsWithoutDatabase() {}
- virtual void CountRequestsWithoutToken() {}
- virtual void CountRequestWithoutTls() {}
-
- virtual TIntrusivePtr<ICounterBlock> Clone() { return this; }
- virtual void UseDatabase(const TString& database) { Y_UNUSED(database); }
-};
-
-using ICounterBlockPtr = TIntrusivePtr<ICounterBlock>;
-
-class TCounterBlock final : public ICounterBlock {
- NMonitoring::TDynamicCounters::TCounterPtr TotalCounter;
- NMonitoring::TDynamicCounters::TCounterPtr InflyCounter;
- NMonitoring::TDynamicCounters::TCounterPtr NotOkRequestCounter;
- NMonitoring::TDynamicCounters::TCounterPtr NotOkResponseCounter;
- NMonitoring::TDynamicCounters::TCounterPtr RequestBytes;
- NMonitoring::TDynamicCounters::TCounterPtr InflyRequestBytes;
- NMonitoring::TDynamicCounters::TCounterPtr ResponseBytes;
- NMonitoring::TDynamicCounters::TCounterPtr NotAuthenticated;
- NMonitoring::TDynamicCounters::TCounterPtr ResourceExhausted;
- bool Percentile = false;
- NMonitoring::TPercentileTracker<4, 512, 15> RequestHistMs;
- std::array<NMonitoring::TDynamicCounters::TCounterPtr, 2> GRpcStatusCounters;
-
-public:
- TCounterBlock(NMonitoring::TDynamicCounters::TCounterPtr totalCounter,
- NMonitoring::TDynamicCounters::TCounterPtr inflyCounter,
- NMonitoring::TDynamicCounters::TCounterPtr notOkRequestCounter,
- NMonitoring::TDynamicCounters::TCounterPtr notOkResponseCounter,
- NMonitoring::TDynamicCounters::TCounterPtr requestBytes,
- NMonitoring::TDynamicCounters::TCounterPtr inflyRequestBytes,
- NMonitoring::TDynamicCounters::TCounterPtr responseBytes,
- NMonitoring::TDynamicCounters::TCounterPtr notAuthenticated,
- NMonitoring::TDynamicCounters::TCounterPtr resourceExhausted,
- TIntrusivePtr<NMonitoring::TDynamicCounters> group)
- : TotalCounter(std::move(totalCounter))
- , InflyCounter(std::move(inflyCounter))
- , NotOkRequestCounter(std::move(notOkRequestCounter))
- , NotOkResponseCounter(std::move(notOkResponseCounter))
- , RequestBytes(std::move(requestBytes))
- , InflyRequestBytes(std::move(inflyRequestBytes))
- , ResponseBytes(std::move(responseBytes))
- , NotAuthenticated(std::move(notAuthenticated))
- , ResourceExhausted(std::move(resourceExhausted))
- {
- if (group) {
- RequestHistMs.Initialize(group, "event", "request", "ms", {0.5f, 0.9f, 0.99f, 0.999f, 1.0f});
- Percentile = true;
- }
- }
-
- void CountNotOkRequest() override {
- NotOkRequestCounter->Inc();
- }
-
- void CountNotOkResponse() override {
- NotOkResponseCounter->Inc();
- }
-
- void CountNotAuthenticated() override {
- NotAuthenticated->Inc();
- }
-
- void CountResourceExhausted() override {
- ResourceExhausted->Inc();
- }
-
- void CountRequestBytes(ui32 requestSize) override {
- *RequestBytes += requestSize;
- }
-
- void CountResponseBytes(ui32 responseSize) override {
- *ResponseBytes += responseSize;
- }
-
- void StartProcessing(ui32 requestSize) override {
- TotalCounter->Inc();
- InflyCounter->Inc();
- *RequestBytes += requestSize;
- *InflyRequestBytes += requestSize;
- }
-
- void FinishProcessing(ui32 requestSize, ui32 responseSize, bool ok, ui32 status,
- TDuration requestDuration) override
- {
- Y_UNUSED(status);
-
- InflyCounter->Dec();
- *InflyRequestBytes -= requestSize;
- *ResponseBytes += responseSize;
- if (!ok) {
- NotOkResponseCounter->Inc();
- }
- if (Percentile) {
- RequestHistMs.Increment(requestDuration.MilliSeconds());
- }
- }
-
- ICounterBlockPtr Clone() override {
- return this;
- }
-
- void Update() {
- if (Percentile) {
- RequestHistMs.Update();
- }
- }
-};
-
-using TCounterBlockPtr = TIntrusivePtr<TCounterBlock>;
-
-/**
- * Creates new instance of ICounterBlock implementation which does nothing.
- *
- * @return new instance
- */
-ICounterBlockPtr FakeCounterBlock();
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request.cpp b/library/cpp/grpc/server/grpc_request.cpp
deleted file mode 100644
index 7030992173..0000000000
--- a/library/cpp/grpc/server/grpc_request.cpp
+++ /dev/null
@@ -1,59 +0,0 @@
-#include "grpc_request.h"
-
-namespace NGrpc {
-
-const char* GRPC_USER_AGENT_HEADER = "user-agent";
-
-class TStreamAdaptor: public IStreamAdaptor {
-public:
- TStreamAdaptor()
- : StreamIsReady_(true)
- {}
-
- void Enqueue(std::function<void()>&& fn, bool urgent) override {
- with_lock(Mtx_) {
- if (!UrgentQueue_.empty() || !NormalQueue_.empty()) {
- Y_ABORT_UNLESS(!StreamIsReady_);
- }
- auto& queue = urgent ? UrgentQueue_ : NormalQueue_;
- if (StreamIsReady_ && queue.empty()) {
- StreamIsReady_ = false;
- } else {
- queue.push_back(std::move(fn));
- return;
- }
- }
- fn();
- }
-
- size_t ProcessNext() override {
- size_t left = 0;
- std::function<void()> fn;
- with_lock(Mtx_) {
- Y_ABORT_UNLESS(!StreamIsReady_);
- auto& queue = UrgentQueue_.empty() ? NormalQueue_ : UrgentQueue_;
- if (queue.empty()) {
- // Both queues are empty
- StreamIsReady_ = true;
- } else {
- fn = std::move(queue.front());
- queue.pop_front();
- left = UrgentQueue_.size() + NormalQueue_.size();
- }
- }
- if (fn)
- fn();
- return left;
- }
-private:
- bool StreamIsReady_;
- TList<std::function<void()>> NormalQueue_;
- TList<std::function<void()>> UrgentQueue_;
- TMutex Mtx_;
-};
-
-IStreamAdaptor::TPtr CreateStreamAdaptor() {
- return std::make_unique<TStreamAdaptor>();
-}
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request.h b/library/cpp/grpc/server/grpc_request.h
deleted file mode 100644
index 5c4cf7c2b8..0000000000
--- a/library/cpp/grpc/server/grpc_request.h
+++ /dev/null
@@ -1,603 +0,0 @@
-#pragma once
-
-#include <google/protobuf/text_format.h>
-#include <google/protobuf/arena.h>
-#include <google/protobuf/message.h>
-
-#include <library/cpp/monlib/dynamic_counters/counters.h>
-#include <library/cpp/logger/priority.h>
-#include <library/cpp/string_utils/quote/quote.h>
-
-#include "grpc_response.h"
-#include "event_callback.h"
-#include "grpc_async_ctx_base.h"
-#include "grpc_counters.h"
-#include "grpc_request_base.h"
-#include "grpc_server.h"
-#include "logger.h"
-
-#include <util/system/hp_timer.h>
-
-#include <grpc++/server.h>
-#include <grpc++/server_context.h>
-#include <grpc++/support/async_stream.h>
-#include <grpc++/support/async_unary_call.h>
-#include <grpc++/support/byte_buffer.h>
-#include <grpc++/impl/codegen/async_stream.h>
-
-namespace NGrpc {
-
-class IStreamAdaptor {
-public:
- using TPtr = std::unique_ptr<IStreamAdaptor>;
- virtual void Enqueue(std::function<void()>&& fn, bool urgent) = 0;
- virtual size_t ProcessNext() = 0;
- virtual ~IStreamAdaptor() = default;
-};
-
-IStreamAdaptor::TPtr CreateStreamAdaptor();
-
-///////////////////////////////////////////////////////////////////////////////
-template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter, typename TOutProtoPrinter>
-class TGRpcRequestImpl
- : public TBaseAsyncContext<TService>
- , public IQueueEvent
- , public IRequestContextBase
-{
- using TThis = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-
-public:
- using TOnRequest = std::function<void (IRequestContextBase* ctx)>;
- using TRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncResponseWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
- using TStreamRequestCallback = void (TService::TCurrentGRpcService::AsyncService::*)(grpc::ServerContext*, TIn*,
- grpc::ServerAsyncWriter<TOut>*, grpc::CompletionQueue*, grpc::ServerCompletionQueue*, void*);
-
- TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TRequestCallback requestCallback,
- const char* name,
- TLoggerPtr logger,
- ICounterBlockPtr counters,
- IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(requestCallback)
- , StreamRequestCallback_(nullptr)
- , Name_(name)
- , Logger_(std::move(logger))
- , Counters_(std::move(counters))
- , RequestLimiter_(std::move(limiter))
- , Writer_(new grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_ABORT_UNLESS(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- }
-
- TGRpcRequestImpl(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- TOnRequest cb,
- TStreamRequestCallback requestCallback,
- const char* name,
- TLoggerPtr logger,
- ICounterBlockPtr counters,
- IGRpcRequestLimiterPtr limiter)
- : TBaseAsyncContext<TService>(service, cq)
- , Server_(server)
- , Cb_(cb)
- , RequestCallback_(nullptr)
- , StreamRequestCallback_(requestCallback)
- , Name_(name)
- , Logger_(std::move(logger))
- , Counters_(std::move(counters))
- , RequestLimiter_(std::move(limiter))
- , StreamWriter_(new grpc::ServerAsyncWriter<TUniversalResponse<TOut>>(&this->Context))
- , StateFunc_(&TThis::SetRequestDone)
- {
- AuthState_ = Server_->NeedAuth() ? TAuthState(true) : TAuthState(false);
- Request_ = google::protobuf::Arena::CreateMessage<TIn>(&Arena_);
- Y_ABORT_UNLESS(Request_);
- GRPC_LOG_DEBUG(Logger_, "[%p] created streaming request Name# %s", this, Name_);
- FinishPromise_ = NThreading::NewPromise<EFinishStatus>();
- StreamAdaptor_ = CreateStreamAdaptor();
- }
-
- TAsyncFinishResult GetFinishFuture() override {
- return FinishPromise_.GetFuture();
- }
-
- bool IsClientLost() const override {
- return ClientLost_.load();
- }
-
- TString GetPeer() const override {
- // Decode URL-encoded square brackets
- auto ip = TString(this->Context.peer());
- CGIUnescape(ip);
- return ip;
- }
-
- bool SslServer() const override {
- return Server_->SslServer();
- }
-
- void Run() {
- // Start request unless server is shutting down
- if (auto guard = Server_->ProtectShutdown()) {
- Ref(); //For grpc c runtime
- this->Context.AsyncNotifyWhenDone(OnFinishTag.Prepare());
- OnBeforeCall();
- if (RequestCallback_) {
- (this->Service->*RequestCallback_)
- (&this->Context, Request_,
- reinterpret_cast<grpc::ServerAsyncResponseWriter<TOut>*>(Writer_.Get()), this->CQ, this->CQ, GetGRpcTag());
- } else {
- (this->Service->*StreamRequestCallback_)
- (&this->Context, Request_,
- reinterpret_cast<grpc::ServerAsyncWriter<TOut>*>(StreamWriter_.Get()), this->CQ, this->CQ, GetGRpcTag());
- }
- }
- }
-
- ~TGRpcRequestImpl() {
- // No direct dtor call allowed
- Y_ASSERT(RefCount() == 0);
- }
-
- bool Execute(bool ok) override {
- return (this->*StateFunc_)(ok);
- }
-
- void DestroyRequest() override {
- Y_ABORT_UNLESS(!CallInProgress_, "Unexpected DestroyRequest while another grpc call is still in progress");
- RequestDestroyed_ = true;
- if (RequestRegistered_) {
- Server_->DeregisterRequestCtx(this);
- RequestRegistered_ = false;
- }
- UnRef();
- }
-
- TInstant Deadline() const override {
- return TBaseAsyncContext<TService>::Deadline();
- }
-
- TSet<TStringBuf> GetPeerMetaKeys() const override {
- return TBaseAsyncContext<TService>::GetPeerMetaKeys();
- }
-
- TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const override {
- return TBaseAsyncContext<TService>::GetPeerMetaValues(key);
- }
-
- TVector<TStringBuf> FindClientCert() const override {
- return TBaseAsyncContext<TService>::FindClientCert();
- }
-
- grpc_compression_level GetCompressionLevel() const override {
- return TBaseAsyncContext<TService>::GetCompressionLevel();
- }
-
- //! Get pointer to the request's message.
- const NProtoBuf::Message* GetRequest() const override {
- return Request_;
- }
-
- NProtoBuf::Message* GetRequestMut() override {
- return Request_;
- }
-
-
- TAuthState& GetAuthState() override {
- return AuthState_;
- }
-
- void Reply(NProtoBuf::Message* resp, ui32 status) override {
- WriteDataOk(resp, status);
- }
-
- void Reply(grpc::ByteBuffer* resp, ui32 status) override {
- WriteByteDataOk(resp, status);
- }
-
- void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details) override {
- FinishGrpcStatus(code, msg, details, false);
- }
-
- void ReplyUnauthenticated(const TString& in) override {
- const TString message = in.empty() ? TString("unauthenticated") : TString("unauthenticated, ") + in;
- FinishGrpcStatus(grpc::StatusCode::UNAUTHENTICATED, message, "", false);
- }
-
- void SetNextReplyCallback(TOnNextReply&& cb) override {
- NextReplyCb_ = cb;
- }
-
- void AddTrailingMetadata(const TString& key, const TString& value) override {
- this->Context.AddTrailingMetadata(key, value);
- }
-
- void FinishStreamingOk() override {
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
- auto cb = [this]() {
- StateFunc_ = &TThis::SetFinishDone;
- GRPC_LOG_DEBUG(Logger_, "[%p] finished streaming Name# %s peer# %s (pushed to grpc)", this, Name_,
- this->Context.peer().c_str());
-
- OnBeforeCall();
- Finished_ = true;
- StreamWriter_->Finish(grpc::Status::OK, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
-
- google::protobuf::Arena* GetArena() override {
- return &Arena_;
- }
-
- void UseDatabase(const TString& database) override {
- Counters_->UseDatabase(database);
- }
-
-private:
- void Clone() {
- if (!Server_->IsShuttingDown()) {
- if (RequestCallback_) {
- MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, RequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- } else {
- MakeIntrusive<TThis>(
- Server_, this->Service, this->CQ, Cb_, StreamRequestCallback_, Name_, Logger_, Counters_->Clone(), RequestLimiter_)->Run();
- }
- }
- }
-
- void OnBeforeCall() {
- Y_ABORT_UNLESS(!RequestDestroyed_, "Cannot start grpc calls after request is already destroyed");
- Y_ABORT_UNLESS(!Finished_, "Cannot start grpc calls after request is finished");
- bool wasInProgress = std::exchange(CallInProgress_, true);
- Y_ABORT_UNLESS(!wasInProgress, "Another grpc call is already in progress");
- }
-
- void OnAfterCall() {
- Y_ABORT_UNLESS(!RequestDestroyed_, "Finished grpc call after request is already destroyed");
- bool wasInProgress = std::exchange(CallInProgress_, false);
- Y_ABORT_UNLESS(wasInProgress, "Finished grpc call that was not in progress");
- }
-
- void WriteDataOk(NProtoBuf::Message* resp, ui32 status) {
- auto makeResponseString = [&] {
- TString x;
- TOutProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*resp, &x);
- return x;
- };
-
- auto sz = (size_t)resp->ByteSize();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s", this, Name_,
- makeResponseString().data(), this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
- ResponseStatus = status;
- Y_ABORT_UNLESS(this->Context.c_call());
- OnBeforeCall();
- Finished_ = true;
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# %s peer# %s (enqueued)",
- this, Name_, makeResponseString().data(), this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid message copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, status]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s peer# %s (pushed to grpc)",
- this, Name_, this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
- ResponseStatus = status;
- OnBeforeCall();
- StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void WriteByteDataOk(grpc::ByteBuffer* resp, ui32 status) {
- auto sz = resp->Length();
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s", this, Name_,
- this->Context.peer().c_str());
- StateFunc_ = &TThis::SetFinishDone;
- ResponseSize = sz;
- ResponseStatus = status;
- OnBeforeCall();
- Finished_ = true;
- Writer_->Finish(TUniversalResponseRef<TOut>(resp), grpc::Status::OK, GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (enqueued)", this, Name_,
- this->Context.peer().c_str());
-
- // because of std::function cannot hold move-only captured object
- // we allocate shared object on heap to avoid buffer copy
- auto uResp = MakeIntrusive<TUniversalResponse<TOut>>(resp);
- auto cb = [this, uResp = std::move(uResp), sz, status]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s data# byteString peer# %s (pushed to grpc)",
- this, Name_, this->Context.peer().c_str());
- StateFunc_ = &TThis::NextReply;
- ResponseSize += sz;
- ResponseStatus = status;
- OnBeforeCall();
- StreamWriter_->Write(*uResp, GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), false);
- }
- }
-
- void FinishGrpcStatus(grpc::StatusCode code, const TString& msg, const TString& details, bool urgent) {
- Y_ABORT_UNLESS(code != grpc::OK);
- if (code == grpc::StatusCode::UNAUTHENTICATED) {
- Counters_->CountNotAuthenticated();
- } else if (code == grpc::StatusCode::RESOURCE_EXHAUSTED) {
- Counters_->CountResourceExhausted();
- }
-
- if (Writer_) {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)", this,
- Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
- TOut resp;
- OnBeforeCall();
- Finished_ = true;
- Writer_->Finish(TUniversalResponseRef<TOut>(&resp), grpc::Status(code, msg, details), GetGRpcTag());
- } else {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (enqueued)", this, Name_, msg.c_str(), this->Context.peer().c_str(), (int)code);
- auto cb = [this, code, msg, details]() {
- GRPC_LOG_DEBUG(Logger_, "[%p] issuing response Name# %s nodata (%s) peer# %s, grpc status# (%d)"
- " (pushed to grpc)", this, Name_, msg.c_str(),
- this->Context.peer().c_str(), (int)code);
- StateFunc_ = &TThis::SetFinishError;
- OnBeforeCall();
- Finished_ = true;
- StreamWriter_->Finish(grpc::Status(code, msg, details), GetGRpcTag());
- };
- StreamAdaptor_->Enqueue(std::move(cb), urgent);
- }
- }
-
- bool SetRequestDone(bool ok) {
- OnAfterCall();
-
- auto makeRequestString = [&] {
- TString resp;
- if (ok) {
- TInProtoPrinter printer;
- printer.SetSingleLineMode(true);
- printer.PrintToString(*Request_, &resp);
- } else {
- resp = "<not ok>";
- }
- return resp;
- };
- GRPC_LOG_DEBUG(Logger_, "[%p] received request Name# %s ok# %s data# %s peer# %s", this, Name_,
- ok ? "true" : "false", makeRequestString().data(), this->Context.peer().c_str());
-
- if (this->Context.c_call() == nullptr) {
- Y_ABORT_UNLESS(!ok);
- // One ref by OnFinishTag, grpc will not call this tag if no request received
- UnRef();
- } else if (!(RequestRegistered_ = Server_->RegisterRequestCtx(this))) {
- // Request cannot be registered due to shutdown
- // It's unsafe to continue, so drop this request without processing
- GRPC_LOG_DEBUG(Logger_, "[%p] dropping request Name# %s due to shutdown", this, Name_);
- this->Context.TryCancel();
- return false;
- }
-
- Clone(); // TODO: Request pool?
- if (!ok) {
- Counters_->CountNotOkRequest();
- return false;
- }
-
- if (IncRequest()) {
- // Adjust counters.
- RequestSize = Request_->ByteSize();
- Counters_->StartProcessing(RequestSize);
- RequestTimer.Reset();
-
- if (!SslServer()) {
- Counters_->CountRequestWithoutTls();
- }
-
- //TODO: Move this in to grpc_request_proxy
- auto maybeDatabase = GetPeerMetaValues(TStringBuf("x-ydb-database"));
- if (maybeDatabase.empty()) {
- Counters_->CountRequestsWithoutDatabase();
- }
- auto maybeToken = GetPeerMetaValues(TStringBuf("x-ydb-auth-ticket"));
- if (maybeToken.empty() || maybeToken[0].empty()) {
- TString db{maybeDatabase ? maybeDatabase[0] : TStringBuf{}};
- Counters_->CountRequestsWithoutToken();
- GRPC_LOG_DEBUG(Logger_, "[%p] received request without user token "
- "Name# %s data# %s peer# %s database# %s", this, Name_,
- makeRequestString().data(), this->Context.peer().c_str(), db.c_str());
- }
-
- // Handle current request.
- Cb_(this);
- } else {
- //This request has not been counted
- SkipUpdateCountersOnError = true;
- FinishGrpcStatus(grpc::StatusCode::RESOURCE_EXHAUSTED, "no resource", "", true);
- }
- return true;
- }
-
- bool NextReply(bool ok) {
- OnAfterCall();
-
- auto logCb = [this, ok](int left) {
- GRPC_LOG_DEBUG(Logger_, "[%p] ready for next reply Name# %s ok# %s peer# %s left# %d", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str(), left);
- };
-
- if (!ok) {
- logCb(-1);
- DecRequest();
- Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
- TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- Ref(); // To prevent destroy during this call in case of execution Finish
- size_t left = StreamAdaptor_->ProcessNext();
- logCb(left);
- if (NextReplyCb_) {
- NextReplyCb_(left);
- }
- // Now it is safe to destroy even if Finish was called
- UnRef();
- return true;
- }
-
- bool SetFinishDone(bool ok) {
- OnAfterCall();
-
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- //PrintBackTrace();
- DecRequest();
- Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
- TDuration::Seconds(RequestTimer.Passed()));
- return false;
- }
-
- bool SetFinishError(bool ok) {
- OnAfterCall();
-
- GRPC_LOG_DEBUG(Logger_, "[%p] finished request with error Name# %s ok# %s peer# %s", this, Name_,
- ok ? "true" : "false", this->Context.peer().c_str());
- if (!SkipUpdateCountersOnError) {
- DecRequest();
- Counters_->FinishProcessing(RequestSize, ResponseSize, ok, ResponseStatus,
- TDuration::Seconds(RequestTimer.Passed()));
- }
- return false;
- }
-
- // Returns pointer to IQueueEvent to pass into grpc c runtime
- // Implicit C style cast from this to void* is wrong due to multiple inheritance
- void* GetGRpcTag() {
- return static_cast<IQueueEvent*>(this);
- }
-
- void OnFinish(EQueueEventStatus evStatus) {
- if (this->Context.IsCancelled()) {
- ClientLost_.store(true);
- FinishPromise_.SetValue(EFinishStatus::CANCEL);
- } else {
- FinishPromise_.SetValue(evStatus == EQueueEventStatus::OK ? EFinishStatus::OK : EFinishStatus::ERROR);
- }
- }
-
- bool IncRequest() {
- if (!Server_->IncRequest())
- return false;
-
- if (!RequestLimiter_)
- return true;
-
- if (!RequestLimiter_->IncRequest()) {
- Server_->DecRequest();
- return false;
- }
-
- return true;
- }
-
- void DecRequest() {
- if (RequestLimiter_) {
- RequestLimiter_->DecRequest();
- }
- Server_->DecRequest();
- }
-
- using TStateFunc = bool (TThis::*)(bool);
- TService* Server_;
- TOnRequest Cb_;
- TRequestCallback RequestCallback_;
- TStreamRequestCallback StreamRequestCallback_;
- const char* const Name_;
- TLoggerPtr Logger_;
- ICounterBlockPtr Counters_;
- IGRpcRequestLimiterPtr RequestLimiter_;
-
- THolder<grpc::ServerAsyncResponseWriter<TUniversalResponseRef<TOut>>> Writer_;
- THolder<grpc::ServerAsyncWriterInterface<TUniversalResponse<TOut>>> StreamWriter_;
- TStateFunc StateFunc_;
- TIn* Request_;
-
- google::protobuf::Arena Arena_;
- TOnNextReply NextReplyCb_;
- ui32 RequestSize = 0;
- ui32 ResponseSize = 0;
- ui32 ResponseStatus = 0;
- THPTimer RequestTimer;
- TAuthState AuthState_ = 0;
- bool RequestRegistered_ = false;
- bool RequestDestroyed_ = false;
- bool CallInProgress_ = false;
- bool Finished_ = false;
-
- using TFixedEvent = TQueueFixedEvent<TGRpcRequestImpl>;
- TFixedEvent OnFinishTag = { this, &TGRpcRequestImpl::OnFinish };
- NThreading::TPromise<EFinishStatus> FinishPromise_;
- bool SkipUpdateCountersOnError = false;
- IStreamAdaptor::TPtr StreamAdaptor_;
- std::atomic<bool> ClientLost_ = false;
-};
-
-template<typename TIn, typename TOut, typename TService, typename TInProtoPrinter=google::protobuf::TextFormat::Printer, typename TOutProtoPrinter=google::protobuf::TextFormat::Printer>
-class TGRpcRequest: public TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter> {
- using TBase = TGRpcRequestImpl<TIn, TOut, TService, TInProtoPrinter, TOutProtoPrinter>;
-public:
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TRequestCallback requestCallback,
- const char* name,
- TLoggerPtr logger,
- ICounterBlockPtr counters,
- IGRpcRequestLimiterPtr limiter = nullptr)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), std::move(limiter)}
- {
- }
-
- TGRpcRequest(TService* server,
- typename TService::TCurrentGRpcService::AsyncService* service,
- grpc::ServerCompletionQueue* cq,
- typename TBase::TOnRequest cb,
- typename TBase::TStreamRequestCallback requestCallback,
- const char* name,
- TLoggerPtr logger,
- ICounterBlockPtr counters)
- : TBase{server, service, cq, std::move(cb), std::move(requestCallback), name, std::move(logger), std::move(counters), nullptr}
- {
- }
-};
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_request_base.h b/library/cpp/grpc/server/grpc_request_base.h
deleted file mode 100644
index 60b38805ed..0000000000
--- a/library/cpp/grpc/server/grpc_request_base.h
+++ /dev/null
@@ -1,124 +0,0 @@
-#pragma once
-
-#include <google/protobuf/message.h>
-#include <library/cpp/threading/future/future.h>
-
-#include <grpc++/server_context.h>
-
-namespace grpc {
-class ByteBuffer;
-}
-
-namespace NGrpc {
-
-extern const char* GRPC_USER_AGENT_HEADER;
-
-struct TAuthState {
- enum EAuthState {
- AS_NOT_PERFORMED,
- AS_OK,
- AS_FAIL,
- AS_UNAVAILABLE
- };
- TAuthState(bool needAuth)
- : NeedAuth(needAuth)
- , State(AS_NOT_PERFORMED)
- {}
- bool NeedAuth;
- EAuthState State;
-};
-
-
-//! An interface that may be used to limit concurrency of requests
-class IGRpcRequestLimiter: public TThrRefBase {
-public:
- virtual bool IncRequest() = 0;
- virtual void DecRequest() = 0;
-};
-
-using IGRpcRequestLimiterPtr = TIntrusivePtr<IGRpcRequestLimiter>;
-
-//! State of current request
-class IRequestContextBase: public TThrRefBase {
-public:
- enum class EFinishStatus {
- OK,
- ERROR,
- CANCEL
- };
- using TAsyncFinishResult = NThreading::TFuture<EFinishStatus>;
-
- using TOnNextReply = std::function<void (size_t left)>;
-
- //! Get pointer to the request's message.
- virtual const NProtoBuf::Message* GetRequest() const = 0;
-
- //! Get mutable pointer to the request's message.
- virtual NProtoBuf::Message* GetRequestMut() = 0;
-
- //! Get current auth state
- virtual TAuthState& GetAuthState() = 0;
-
- //! Send common response (The request shoult be created for protobuf response type)
- //! Implementation can swap protobuf message
- virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
-
- //! Send serialised response (The request shoult be created for bytes response type)
- //! Implementation can swap ByteBuffer
- virtual void Reply(grpc::ByteBuffer* resp, ui32 status = 0) = 0;
-
- //! Send grpc UNAUTHENTICATED status
- virtual void ReplyUnauthenticated(const TString& in) = 0;
-
- //! Send grpc error
- virtual void ReplyError(grpc::StatusCode code, const TString& msg, const TString& details = "") = 0;
-
- //! Returns deadline (server epoch related) if peer set it on its side, or Instanse::Max() otherwise
- virtual TInstant Deadline() const = 0;
-
- //! Returns available peer metadata keys
- virtual TSet<TStringBuf> GetPeerMetaKeys() const = 0;
-
- //! Returns peer optional metavalue
- virtual TVector<TStringBuf> GetPeerMetaValues(TStringBuf key) const = 0;
-
- virtual TVector<TStringBuf> FindClientCert() const = 0;
-
- //! Returns request compression level
- virtual grpc_compression_level GetCompressionLevel() const = 0;
-
- //! Returns protobuf arena allocator associated with current request
- //! Lifetime of the arena is lifetime of the context
- virtual google::protobuf::Arena* GetArena() = 0;
-
- //! Add trailing metadata in to grpc context
- //! The metadata will be send at the time of rpc finish
- virtual void AddTrailingMetadata(const TString& key, const TString& value) = 0;
-
- //! Use validated database name for counters
- virtual void UseDatabase(const TString& database) = 0;
-
- // Streaming part
-
- //! Set callback. The callback will be called when response deliverid to the client
- //! after that we can call Reply again in streaming mode. Yes, GRpc says there is only one
- //! reply in flight
- virtual void SetNextReplyCallback(TOnNextReply&& cb) = 0;
-
- //! Finish streaming reply
- virtual void FinishStreamingOk() = 0;
-
- //! Returns future to get cancel of finish notification
- virtual TAsyncFinishResult GetFinishFuture() = 0;
-
- //! Returns peer address
- virtual TString GetPeer() const = 0;
-
- //! Returns true if server is using ssl
- virtual bool SslServer() const = 0;
-
- //! Returns true if client was not interested in result (but we still must send response to make grpc happy)
- virtual bool IsClientLost() const = 0;
-};
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_response.h b/library/cpp/grpc/server/grpc_response.h
deleted file mode 100644
index 8e9afe44d5..0000000000
--- a/library/cpp/grpc/server/grpc_response.h
+++ /dev/null
@@ -1,90 +0,0 @@
-#pragma once
-
-#include <grpc++/impl/codegen/byte_buffer.h>
-#include <grpc++/impl/codegen/proto_utils.h>
-
-#include <variant>
-
-namespace NGrpc {
-
-/**
- * Universal response that owns underlying message or buffer.
- */
-template <typename TMsg>
-class TUniversalResponse: public TAtomicRefCount<TUniversalResponse<TMsg>>, public TMoveOnly {
- friend class grpc::SerializationTraits<TUniversalResponse<TMsg>>;
-
-public:
- explicit TUniversalResponse(NProtoBuf::Message* msg) noexcept
- : Data_{TMsg{}}
- {
- std::get<TMsg>(Data_).Swap(static_cast<TMsg*>(msg));
- }
-
- explicit TUniversalResponse(grpc::ByteBuffer* buffer) noexcept
- : Data_{grpc::ByteBuffer{}}
- {
- std::get<grpc::ByteBuffer>(Data_).Swap(buffer);
- }
-
-private:
- std::variant<TMsg, grpc::ByteBuffer> Data_;
-};
-
-/**
- * Universal response that only keeps reference to underlying message or buffer.
- */
-template <typename TMsg>
-class TUniversalResponseRef: private TMoveOnly {
- friend class grpc::SerializationTraits<TUniversalResponseRef<TMsg>>;
-
-public:
- explicit TUniversalResponseRef(const NProtoBuf::Message* msg)
- : Data_{msg}
- {
- }
-
- explicit TUniversalResponseRef(const grpc::ByteBuffer* buffer)
- : Data_{buffer}
- {
- }
-
-private:
- std::variant<const NProtoBuf::Message*, const grpc::ByteBuffer*> Data_;
-};
-
-} // namespace NGrpc
-
-namespace grpc {
-
-template <typename TMsg>
-class SerializationTraits<NGrpc::TUniversalResponse<TMsg>> {
-public:
- static Status Serialize(
- const NGrpc::TUniversalResponse<TMsg>& resp,
- ByteBuffer* buffer,
- bool* ownBuffer)
- {
- return std::visit([&](const auto& data) {
- using T = std::decay_t<decltype(data)>;
- return SerializationTraits<T>::Serialize(data, buffer, ownBuffer);
- }, resp.Data_);
- }
-};
-
-template <typename TMsg>
-class SerializationTraits<NGrpc::TUniversalResponseRef<TMsg>> {
-public:
- static Status Serialize(
- const NGrpc::TUniversalResponseRef<TMsg>& resp,
- ByteBuffer* buffer,
- bool* ownBuffer)
- {
- return std::visit([&](const auto* data) {
- using T = std::decay_t<std::remove_pointer_t<decltype(data)>>;
- return SerializationTraits<T>::Serialize(*data, buffer, ownBuffer);
- }, resp.Data_);
- }
-};
-
-} // namespace grpc
diff --git a/library/cpp/grpc/server/grpc_server.cpp b/library/cpp/grpc/server/grpc_server.cpp
deleted file mode 100644
index a38d4c9da6..0000000000
--- a/library/cpp/grpc/server/grpc_server.cpp
+++ /dev/null
@@ -1,251 +0,0 @@
-#include "grpc_server.h"
-
-#include <util/string/join.h>
-#include <util/generic/yexception.h>
-#include <util/system/thread.h>
-#include <util/generic/map.h>
-
-#include <grpc++/resource_quota.h>
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-
-#if !defined(_WIN32) && !defined(_WIN64)
-
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-
-#endif
-
-namespace NGrpc {
-
-using NThreading::TFuture;
-
-static void PullEvents(grpc::ServerCompletionQueue* cq) {
- TThread::SetCurrentThreadName("grpc_server");
- while (true) {
- void* tag; // uniquely identifies a request.
- bool ok;
-
- if (cq->Next(&tag, &ok)) {
- IQueueEvent* const ev(static_cast<IQueueEvent*>(tag));
-
- if (!ev->Execute(ok)) {
- ev->DestroyRequest();
- }
- } else {
- break;
- }
- }
-}
-
-TGRpcServer::TGRpcServer(const TServerOptions& opts)
- : Options_(opts)
- , Limiter_(Options_.MaxGlobalRequestInFlight)
- {}
-
-TGRpcServer::~TGRpcServer() {
- Y_ABORT_UNLESS(Ts.empty());
- Services_.clear();
-}
-
-void TGRpcServer::AddService(IGRpcServicePtr service) {
- Services_.push_back(service);
-}
-
-void TGRpcServer::Start() {
- TString server_address(Join(":", Options_.Host, Options_.Port)); // https://st.yandex-team.ru/DTCC-695
- using grpc::ServerBuilder;
- using grpc::ResourceQuota;
- ServerBuilder builder;
- auto credentials = grpc::InsecureServerCredentials();
- if (Options_.SslData) {
- grpc::SslServerCredentialsOptions::PemKeyCertPair keycert;
- keycert.cert_chain = std::move(Options_.SslData->Cert);
- keycert.private_key = std::move(Options_.SslData->Key);
- grpc::SslServerCredentialsOptions sslOps;
- sslOps.pem_root_certs = std::move(Options_.SslData->Root);
- sslOps.pem_key_cert_pairs.push_back(keycert);
-
- if (Options_.SslData->DoRequestClientCertificate) {
- sslOps.client_certificate_request = GRPC_SSL_REQUEST_CLIENT_CERTIFICATE_AND_VERIFY;
- }
-
- credentials = grpc::SslServerCredentials(sslOps);
- }
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Init(builder.experimental().AddExternalConnectionAcceptor(
- ServerBuilder::experimental_type::ExternalConnectionType::FROM_FD,
- credentials
- ));
- } else {
- builder.AddListeningPort(server_address, credentials);
- }
- builder.SetMaxReceiveMessageSize(Options_.MaxMessageSize);
- builder.SetMaxSendMessageSize(Options_.MaxMessageSize);
- for (IGRpcServicePtr service : Services_) {
- service->SetServerOptions(Options_);
- builder.RegisterService(service->GetService());
- service->SetGlobalLimiterHandle(&Limiter_);
- }
-
- class TKeepAliveOption: public grpc::ServerBuilderOption {
- public:
- TKeepAliveOption(int idle, int interval)
- : Idle(idle)
- , Interval(interval)
- , KeepAliveEnabled(true)
- {}
-
- TKeepAliveOption()
- : Idle(0)
- , Interval(0)
- , KeepAliveEnabled(false)
- {}
-
- void UpdateArguments(grpc::ChannelArguments *args) override {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PING_STRIKES, 0);
- args->SetInt(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, 1000);
- if (KeepAliveEnabled) {
- args->SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
- args->SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 1);
- args->SetInt(GRPC_ARG_HTTP2_MIN_SENT_PING_INTERVAL_WITHOUT_DATA_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, Idle * 1000);
- args->SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, Interval * 1000);
- }
- }
-
- void UpdatePlugins(std::vector<std::unique_ptr<grpc::ServerBuilderPlugin>>* /*plugins*/) override
- {}
- private:
- const int Idle;
- const int Interval;
- const bool KeepAliveEnabled;
- };
-
- if (Options_.KeepAliveEnable) {
- builder.SetOption(std::make_unique<TKeepAliveOption>(
- Options_.KeepAliveIdleTimeoutTriggerSec,
- Options_.KeepAliveProbeIntervalSec));
- } else {
- builder.SetOption(std::make_unique<TKeepAliveOption>());
- }
-
- size_t completionQueueCount = 1;
- if (Options_.WorkersPerCompletionQueue) {
- size_t threadsPerQueue = Max(std::size_t{1}, Options_.WorkersPerCompletionQueue);
- completionQueueCount = (Options_.WorkerThreads + threadsPerQueue - 1) / threadsPerQueue; // ceiling
- } else if (Options_.UseCompletionQueuePerThread) {
- completionQueueCount = Options_.WorkerThreads;
- }
-
- CQS_.reserve(completionQueueCount);
- for (size_t i = 0; i < completionQueueCount; ++i) {
- CQS_.push_back(builder.AddCompletionQueue());
- }
-
- if (Options_.GRpcMemoryQuotaBytes) {
- // See details KIKIMR-6932
- if (Options_.EnableGRpcMemoryQuota) {
- grpc::ResourceQuota quota("memory_bound");
- quota.Resize(Options_.GRpcMemoryQuotaBytes);
-
- builder.SetResourceQuota(quota);
-
- Cerr << "Set GRpc memory quota to: " << Options_.GRpcMemoryQuotaBytes << Endl;
- } else {
- Cerr << "GRpc memory quota was set but disabled due to issues with grpc quoter"
- ", to enable it use EnableGRpcMemoryQuota option" << Endl;
- }
- }
- Options_.ServerBuilderMutator(builder);
- builder.SetDefaultCompressionLevel(Options_.DefaultCompressionLevel);
-
- Server_ = builder.BuildAndStart();
- if (!Server_) {
- ythrow yexception() << "can't start grpc server on " << server_address;
- }
-
- size_t index = 0;
- for (IGRpcServicePtr service : Services_) {
- // TODO: provide something else for services instead of ServerCompletionQueue
- service->InitService(CQS_, Options_.Logger, index++);
- }
-
- Ts.reserve(Options_.WorkerThreads);
- for (size_t i = 0; i < Options_.WorkerThreads; ++i) {
- auto* cq = &CQS_[i % CQS_.size()];
- Ts.push_back(SystemThreadFactory()->Run([cq] {
- PullEvents(cq->get());
- }));
- }
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Start();
- }
-}
-
-void TGRpcServer::Stop() {
- for (auto& service : Services_) {
- service->StopService();
- }
-
- auto now = TInstant::Now();
-
- if (Server_) {
- i64 sec = Options_.GRpcShutdownDeadline.Seconds();
- Y_ABORT_UNLESS(Options_.GRpcShutdownDeadline.NanoSecondsOfSecond() <= Max<i32>());
- i32 nanosecOfSec = Options_.GRpcShutdownDeadline.NanoSecondsOfSecond();
- Server_->Shutdown(gpr_timespec{sec, nanosecOfSec, GPR_TIMESPAN});
- }
-
- for (ui64 attempt = 0; ; ++attempt) {
- bool unsafe = false;
- size_t infly = 0;
- for (auto& service : Services_) {
- unsafe |= service->IsUnsafeToShutdown();
- infly += service->RequestsInProgress();
- }
-
- if (!unsafe && !infly)
- break;
-
- auto spent = (TInstant::Now() - now).SecondsFloat();
- if (attempt % 300 == 0) {
- // don't log too much
- Cerr << "GRpc shutdown warning: left infly: " << infly << ", spent: " << spent << " sec" << Endl;
- }
-
- if (!unsafe && spent > Options_.GRpcShutdownDeadline.SecondsFloat())
- break;
- Sleep(TDuration::MilliSeconds(10));
- }
-
- // Always shutdown the completion queue after the server.
- for (auto& cq : CQS_) {
- cq->Shutdown();
- }
-
- for (auto ti = Ts.begin(); ti != Ts.end(); ++ti) {
- (*ti)->Join();
- }
-
- Ts.clear();
-
- if (Options_.ExternalListener) {
- Options_.ExternalListener->Stop();
- }
-}
-
-ui16 TGRpcServer::GetPort() const {
- return Options_.Port;
-}
-
-TString TGRpcServer::GetHost() const {
- return Options_.Host;
-}
-
-const TVector<TGRpcServer::IGRpcServicePtr>& TGRpcServer::GetServices() const {
- return Services_;
-}
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/grpc_server.h b/library/cpp/grpc/server/grpc_server.h
deleted file mode 100644
index 2d746de99b..0000000000
--- a/library/cpp/grpc/server/grpc_server.h
+++ /dev/null
@@ -1,404 +0,0 @@
-#pragma once
-
-#include "grpc_request_base.h"
-#include "logger.h"
-
-#include <library/cpp/threading/future/future.h>
-
-#include <util/generic/ptr.h>
-#include <util/generic/string.h>
-#include <util/generic/vector.h>
-#include <util/generic/maybe.h>
-#include <util/generic/queue.h>
-#include <util/generic/hash_set.h>
-#include <util/system/types.h>
-#include <util/system/mutex.h>
-#include <util/thread/factory.h>
-
-#include <grpc++/grpc++.h>
-
-namespace NGrpc {
-
-constexpr ui64 DEFAULT_GRPC_MESSAGE_SIZE_LIMIT = 64000000;
-
-struct TSslData {
- TString Cert;
- TString Key;
- TString Root;
- bool DoRequestClientCertificate = false;
-};
-
-struct IExternalListener
- : public TThrRefBase
-{
- using TPtr = TIntrusivePtr<IExternalListener>;
- virtual void Init(std::unique_ptr<grpc::experimental::ExternalConnectionAcceptor> acceptor) = 0;
- virtual void Start() = 0;
- virtual void Stop() = 0;
-};
-
-//! Server's options.
-struct TServerOptions {
-#define DECLARE_FIELD(name, type, default) \
- type name{default}; \
- inline TServerOptions& Set##name(const type& value) { \
- name = value; \
- return *this; \
- }
-
- //! Hostname of server to bind to.
- DECLARE_FIELD(Host, TString, "[::]");
- //! Service port.
- DECLARE_FIELD(Port, ui16, 0);
-
- //! Number of worker threads.
- DECLARE_FIELD(WorkerThreads, size_t, 2);
-
- //! Number of workers per completion queue, i.e. when
- // WorkerThreads=8 and PriorityWorkersPerCompletionQueue=2
- // there will be 4 completion queues. When set to 0 then
- // only UseCompletionQueuePerThread affects number of CQ.
- DECLARE_FIELD(WorkersPerCompletionQueue, size_t, 0);
-
- //! Obsolete. Create one completion queue per thread.
- // Setting true equals to the WorkersPerCompletionQueue=1
- DECLARE_FIELD(UseCompletionQueuePerThread, bool, false);
-
- //! Memory quota size for grpc server in bytes. Zero means unlimited.
- DECLARE_FIELD(GRpcMemoryQuotaBytes, size_t, 0);
-
- //! Enable Grpc memory quota feature.
- DECLARE_FIELD(EnableGRpcMemoryQuota, bool, false);
-
- //! How long to wait until pending rpcs are forcefully terminated.
- DECLARE_FIELD(GRpcShutdownDeadline, TDuration, TDuration::Seconds(30));
-
- //! In/Out message size limit
- DECLARE_FIELD(MaxMessageSize, size_t, DEFAULT_GRPC_MESSAGE_SIZE_LIMIT);
-
- //! Use GRpc keepalive
- DECLARE_FIELD(KeepAliveEnable, TMaybe<bool>, TMaybe<bool>());
-
- //! GRPC_ARG_KEEPALIVE_TIME_MS setting
- DECLARE_FIELD(KeepAliveIdleTimeoutTriggerSec, int, 0);
-
- //! Deprecated, ths option ignored. Will be removed soon.
- DECLARE_FIELD(KeepAliveMaxProbeCount, int, 0);
-
- //! GRPC_ARG_KEEPALIVE_TIMEOUT_MS setting
- DECLARE_FIELD(KeepAliveProbeIntervalSec, int, 0);
-
- //! Max number of requests processing by services (global limit for grpc server)
- DECLARE_FIELD(MaxGlobalRequestInFlight, size_t, 100000);
-
- //! SSL server data
- DECLARE_FIELD(SslData, TMaybe<TSslData>, TMaybe<TSslData>());
-
- //! GRPC auth
- DECLARE_FIELD(UseAuth, bool, false);
-
- //! Default compression level. Used when no compression options provided by client.
- // Mapping to particular compression algorithm depends on client.
- DECLARE_FIELD(DefaultCompressionLevel, grpc_compression_level, GRPC_COMPRESS_LEVEL_NONE);
-
- //! Custom configurator for ServerBuilder.
- DECLARE_FIELD(ServerBuilderMutator, std::function<void(grpc::ServerBuilder&)>, [](grpc::ServerBuilder&){});
-
- DECLARE_FIELD(ExternalListener, IExternalListener::TPtr, nullptr);
-
- //! Logger which will be used to write logs about requests handling (iff appropriate log level is enabled).
- DECLARE_FIELD(Logger, TLoggerPtr, nullptr);
-
-#undef DECLARE_FIELD
-};
-
-class IQueueEvent {
-public:
- virtual ~IQueueEvent() = default;
-
- //! Execute an action defined by implementation.
- virtual bool Execute(bool ok) = 0;
-
- //! It is time to perform action requested by AcquireToken server method. It will be called under lock which is also
- // used in ReturnToken/AcquireToken methods. Default implementation does nothing assuming that request processor does
- // not implement in flight management.
- virtual void Process() {}
-
- //! Finish and destroy request.
- virtual void DestroyRequest() = 0;
-};
-
-class ICancelableContext {
-public:
- virtual void Shutdown() = 0;
- virtual ~ICancelableContext() = default;
-
-private:
- template<class T>
- friend class TGrpcServiceBase;
-
- // Shard assigned by RegisterRequestCtx. This field is not thread-safe
- // because RegisterRequestCtx may only be called once for a single service,
- // so it's only assigned once.
- size_t ShardIndex = size_t(-1);
-};
-
-template <class TLimit>
-class TInFlightLimiterImpl {
-public:
- explicit TInFlightLimiterImpl(const TLimit& limit)
- : Limit_(limit)
- {}
-
- bool Inc() {
- i64 newVal;
- i64 prev;
- do {
- prev = AtomicGet(CurInFlightReqs_);
- Y_ABORT_UNLESS(prev >= 0);
- if (Limit_ && prev > Limit_) {
- return false;
- }
- newVal = prev + 1;
- } while (!AtomicCas(&CurInFlightReqs_, newVal, prev));
- return true;
- }
-
- void Dec() {
- i64 newVal = AtomicDecrement(CurInFlightReqs_);
- Y_ABORT_UNLESS(newVal >= 0);
- }
-
- i64 GetCurrentInFlight() const {
- return AtomicGet(CurInFlightReqs_);
- }
-
-private:
- const TLimit Limit_;
- TAtomic CurInFlightReqs_ = 0;
-};
-
-using TGlobalLimiter = TInFlightLimiterImpl<i64>;
-
-
-class IGRpcService: public TThrRefBase {
-public:
- virtual grpc::Service* GetService() = 0;
- virtual void StopService() noexcept = 0;
-
- virtual void InitService(grpc::ServerCompletionQueue* cq, TLoggerPtr logger) = 0;
-
- virtual void InitService(
- const std::vector<std::unique_ptr<grpc::ServerCompletionQueue>>& cqs,
- TLoggerPtr logger,
- size_t index)
- {
- InitService(cqs[index % cqs.size()].get(), logger);
- }
-
- virtual void SetGlobalLimiterHandle(TGlobalLimiter* limiter) = 0;
- virtual bool IsUnsafeToShutdown() const = 0;
- virtual size_t RequestsInProgress() const = 0;
-
- /**
- * Called before service is added to the server builder. This allows
- * service to inspect server options and initialize accordingly.
- */
- virtual void SetServerOptions(const TServerOptions& options) = 0;
-};
-
-template<typename T>
-class TGrpcServiceBase: public IGRpcService {
-public:
- class TShutdownGuard {
- using TOwner = TGrpcServiceBase<T>;
- friend class TGrpcServiceBase<T>;
-
- public:
- TShutdownGuard()
- : Owner(nullptr)
- { }
-
- ~TShutdownGuard() {
- Release();
- }
-
- TShutdownGuard(TShutdownGuard&& other)
- : Owner(other.Owner)
- {
- other.Owner = nullptr;
- }
-
- TShutdownGuard& operator=(TShutdownGuard&& other) {
- if (Y_LIKELY(this != &other)) {
- Release();
- Owner = other.Owner;
- other.Owner = nullptr;
- }
- return *this;
- }
-
- explicit operator bool() const {
- return bool(Owner);
- }
-
- void Release() {
- if (Owner) {
- AtomicDecrement(Owner->GuardCount_);
- Owner = nullptr;
- }
- }
-
- TShutdownGuard(const TShutdownGuard&) = delete;
- TShutdownGuard& operator=(const TShutdownGuard&) = delete;
-
- private:
- explicit TShutdownGuard(TOwner* owner)
- : Owner(owner)
- { }
-
- private:
- TOwner* Owner;
- };
-
-public:
- using TCurrentGRpcService = T;
-
- void StopService() noexcept override {
- AtomicSet(ShuttingDown_, 1);
-
- for (auto& shard : Shards_) {
- with_lock(shard.Lock_) {
- // Send TryCansel to event (can be send after finishing).
- // Actual dtors will be called from grpc thread, so deadlock impossible
- for (auto* request : shard.Requests_) {
- request->Shutdown();
- }
- }
- }
- }
-
- TShutdownGuard ProtectShutdown() noexcept {
- AtomicIncrement(GuardCount_);
- if (IsShuttingDown()) {
- AtomicDecrement(GuardCount_);
- return { };
- }
-
- return TShutdownGuard(this);
- }
-
- bool IsUnsafeToShutdown() const override {
- return AtomicGet(GuardCount_) > 0;
- }
-
- size_t RequestsInProgress() const override {
- size_t c = 0;
- for (auto& shard : Shards_) {
- with_lock(shard.Lock_) {
- c += shard.Requests_.size();
- }
- }
- return c;
- }
-
- void SetServerOptions(const TServerOptions& options) override {
- SslServer_ = bool(options.SslData);
- NeedAuth_ = options.UseAuth;
- }
-
- void SetGlobalLimiterHandle(TGlobalLimiter* /*limiter*/) override {}
-
- //! Check if the server is going to shut down.
- bool IsShuttingDown() const {
- return AtomicGet(ShuttingDown_);
- }
-
- bool SslServer() const {
- return SslServer_;
- }
-
- bool NeedAuth() const {
- return NeedAuth_;
- }
-
- bool RegisterRequestCtx(ICancelableContext* req) {
- if (Y_LIKELY(req->ShardIndex == size_t(-1))) {
- req->ShardIndex = NextShard_.fetch_add(1, std::memory_order_relaxed) % Shards_.size();
- }
-
- auto& shard = Shards_[req->ShardIndex];
- with_lock(shard.Lock_) {
- if (IsShuttingDown()) {
- return false;
- }
-
- auto r = shard.Requests_.emplace(req);
- Y_ABORT_UNLESS(r.second, "Ctx already registered");
- }
-
- return true;
- }
-
- void DeregisterRequestCtx(ICancelableContext* req) {
- Y_ABORT_UNLESS(req->ShardIndex != size_t(-1), "Ctx does not have an assigned shard index");
-
- auto& shard = Shards_[req->ShardIndex];
- with_lock(shard.Lock_) {
- Y_ABORT_UNLESS(shard.Requests_.erase(req), "Ctx is not registered");
- }
- }
-
-protected:
- using TGrpcAsyncService = typename TCurrentGRpcService::AsyncService;
- TGrpcAsyncService Service_;
-
- TGrpcAsyncService* GetService() override {
- return &Service_;
- }
-
-private:
- TAtomic ShuttingDown_ = 0;
- TAtomic GuardCount_ = 0;
-
- bool SslServer_ = false;
- bool NeedAuth_ = false;
-
- struct TShard {
- TAdaptiveLock Lock_;
- THashSet<ICancelableContext*> Requests_;
- };
-
- // Note: benchmarks showed 4 shards is enough to scale to ~30 threads
- TVector<TShard> Shards_{ size_t(4) };
- std::atomic<size_t> NextShard_{ 0 };
-};
-
-class TGRpcServer {
-public:
- using IGRpcServicePtr = TIntrusivePtr<IGRpcService>;
- TGRpcServer(const TServerOptions& opts);
- ~TGRpcServer();
- void AddService(IGRpcServicePtr service);
- void Start();
- // Send stop to registred services and call Shutdown on grpc server
- // This method MUST be called before destroying TGRpcServer
- void Stop();
- ui16 GetPort() const;
- TString GetHost() const;
-
- const TVector<IGRpcServicePtr>& GetServices() const;
-
-private:
- using IThreadRef = TAutoPtr<IThreadFactory::IThread>;
-
- const TServerOptions Options_;
- std::unique_ptr<grpc::Server> Server_;
- std::vector<std::unique_ptr<grpc::ServerCompletionQueue>> CQS_;
- TVector<IThreadRef> Ts;
-
- TVector<IGRpcServicePtr> Services_;
- TGlobalLimiter Limiter_;
-};
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/logger.h b/library/cpp/grpc/server/logger.h
deleted file mode 100644
index 53af26be9c..0000000000
--- a/library/cpp/grpc/server/logger.h
+++ /dev/null
@@ -1,43 +0,0 @@
-#pragma once
-
-#include <library/cpp/logger/priority.h>
-
-#include <util/generic/ptr.h>
-
-namespace NGrpc {
-
-class TLogger: public TThrRefBase {
-protected:
- TLogger() = default;
-
-public:
- [[nodiscard]]
- bool IsEnabled(ELogPriority priority) const noexcept {
- return DoIsEnabled(priority);
- }
-
- void Y_PRINTF_FORMAT(3, 4) Write(ELogPriority priority, const char* format, ...) noexcept {
- va_list args;
- va_start(args, format);
- DoWrite(priority, format, args);
- va_end(args);
- }
-
-protected:
- virtual bool DoIsEnabled(ELogPriority priority) const noexcept = 0;
- virtual void DoWrite(ELogPriority p, const char* format, va_list args) noexcept = 0;
-};
-
-using TLoggerPtr = TIntrusivePtr<TLogger>;
-
-#define GRPC_LOG_DEBUG(logger, format, ...) \
- if (logger && logger->IsEnabled(ELogPriority::TLOG_DEBUG)) { \
- logger->Write(ELogPriority::TLOG_DEBUG, format, __VA_ARGS__); \
- } else { }
-
-#define GRPC_LOG_INFO(logger, format, ...) \
- if (logger && logger->IsEnabled(ELogPriority::TLOG_INFO)) { \
- logger->Write(ELogPriority::TLOG_INFO, format, __VA_ARGS__); \
- } else { }
-
-} // namespace NGrpc
diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp
deleted file mode 100644
index 97952f4166..0000000000
--- a/library/cpp/grpc/server/ut/grpc_response_ut.cpp
+++ /dev/null
@@ -1,86 +0,0 @@
-#include <library/cpp/grpc/server/grpc_response.h>
-#include <library/cpp/testing/unittest/registar.h>
-
-#include <google/protobuf/duration.pb.h>
-#include <grpc++/impl/codegen/proto_utils.h>
-#include <grpc++/impl/grpc_library.h>
-
-using namespace NGrpc;
-
-using google::protobuf::Duration;
-
-Y_UNIT_TEST_SUITE(ResponseTest) {
-
- template <typename T>
- grpc::ByteBuffer Serialize(T resp) {
- grpc::ByteBuffer buf;
- bool ownBuf = false;
- grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf);
- UNIT_ASSERT(status.ok());
- return buf;
- }
-
- template <typename T>
- T Deserialize(grpc::ByteBuffer* buf) {
- T message;
- auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message);
- UNIT_ASSERT(status.ok());
- return message;
- }
-
- Y_UNIT_TEST(UniversalResponseMsg) {
- Duration d1;
- d1.set_seconds(12345);
- d1.set_nanos(67890);
-
- auto buf = Serialize(TUniversalResponse<Duration>(&d1));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
- }
-
- Y_UNIT_TEST(UniversalResponseBuf) {
- Duration d1;
- d1.set_seconds(123);
- d1.set_nanos(456);
-
- TString data = d1.SerializeAsString();
- grpc::Slice dataSlice{data.data(), data.size()};
- grpc::ByteBuffer dataBuf{&dataSlice, 1};
-
- auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
- }
-
- Y_UNIT_TEST(UniversalResponseRefMsg) {
- Duration d1;
- d1.set_seconds(12345);
- d1.set_nanos(67890);
-
- auto buf = Serialize(TUniversalResponseRef<Duration>(&d1));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
- }
-
- Y_UNIT_TEST(UniversalResponseRefBuf) {
- Duration d1;
- d1.set_seconds(123);
- d1.set_nanos(456);
-
- TString data = d1.SerializeAsString();
- grpc::Slice dataSlice{data.data(), data.size()};
- grpc::ByteBuffer dataBuf{&dataSlice, 1};
-
- auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf));
- Duration d2 = Deserialize<Duration>(&buf);
-
- UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
- UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
- }
-}
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
deleted file mode 100644
index c34d3b8c2b..0000000000
--- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-#include <library/cpp/grpc/server/grpc_request.h>
-#include <library/cpp/testing/unittest/registar.h>
-#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/system/thread.h>
-#include <util/thread/pool.h>
-
-using namespace NGrpc;
-
-// Here we emulate stream data producer
-class TOrderedProducer: public TThread {
-public:
- TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
- : TThread(&ThreadProc, this)
- , Adaptor_(adaptor)
- , Max_(max)
- , WithSleep_(withSleep)
- , ConsumerOp_(std::move(consumerOp))
- {}
-
- static void* ThreadProc(void* _this) {
- SetCurrentThreadName("OrderedProducerThread");
- static_cast<TOrderedProducer*>(_this)->Exec();
- return nullptr;
- }
-
- void Exec() {
- for (ui64 i = 0; i < Max_; i++) {
- auto cb = [i, this]() mutable {
- ConsumerOp_(i);
- };
- Adaptor_->Enqueue(std::move(cb), false);
- if (WithSleep_ && (i % 256 == 0)) {
- Sleep(TDuration::MilliSeconds(10));
- }
- }
- }
-
-private:
- IStreamAdaptor* Adaptor_;
- const ui64 Max_;
- const bool WithSleep_;
- std::function<void(ui64)> ConsumerOp_;
-};
-
-Y_UNIT_TEST_SUITE(StreamAdaptor) {
- static void OrderingTest(size_t threads, bool withSleep) {
-
- auto adaptor = CreateStreamAdaptor();
-
- const i64 max = 10000;
-
- // Here we will emulate grpc stream (NextReply call after writing)
- std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false)));
- // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
- consumerQueue->Start(threads, 1);
-
- // Non atomic!!! Stream adaptor must protect us
- ui64 curVal = 0;
-
- // Used just to wait in the main thread
- TAtomic finished = false;
- auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
- // Check no reordering inside stream adaptor
- // and no simultanious consumer Op call
- UNIT_ASSERT_VALUES_EQUAL(curVal, i);
- curVal++;
- // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
- // so compare here and set after
- bool tmp = curVal == max;
- bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
- // Additional check the value still same
- // run under tsan makes sure no consumer Op call before we call ProcessNext
- UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
- ptr->ProcessNext();
- // Reordering after ProcessNext is possible, so check tmp and set finished to true
- if (tmp)
- AtomicSet(finished, true);
- });
- UNIT_ASSERT(res);
- };
-
- TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
-
- producer.Start();
- producer.Join();
-
- while (!AtomicGet(finished))
- {
- Sleep(TDuration::MilliSeconds(100));
- }
-
- consumerQueue->Stop();
-
- UNIT_ASSERT_VALUES_EQUAL(curVal, max);
- }
-
- Y_UNIT_TEST(OrderingOneThread) {
- OrderingTest(1, false);
- }
-
- Y_UNIT_TEST(OrderingTwoThreads) {
- OrderingTest(2, false);
- }
-
- Y_UNIT_TEST(OrderingManyThreads) {
- OrderingTest(10, false);
- }
-
- Y_UNIT_TEST(OrderingOneThreadWithSleep) {
- OrderingTest(1, true);
- }
-
- Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
- OrderingTest(2, true);
- }
-
- Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
- OrderingTest(10, true);
- }
-}
diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make
deleted file mode 100644
index 94408dc8ce..0000000000
--- a/library/cpp/grpc/server/ut/ya.make
+++ /dev/null
@@ -1,16 +0,0 @@
-UNITTEST_FOR(library/cpp/grpc/server)
-
-TIMEOUT(600)
-SIZE(MEDIUM)
-
-PEERDIR(
- library/cpp/grpc/server
-)
-
-SRCS(
- grpc_response_ut.cpp
- stream_adaptor_ut.cpp
-)
-
-END()
-
diff --git a/library/cpp/grpc/server/ya.make b/library/cpp/grpc/server/ya.make
deleted file mode 100644
index 9ed8f30ef2..0000000000
--- a/library/cpp/grpc/server/ya.make
+++ /dev/null
@@ -1,20 +0,0 @@
-LIBRARY()
-
-SRCS(
- event_callback.cpp
- grpc_request.cpp
- grpc_server.cpp
- grpc_counters.cpp
-)
-
-GENERATE_ENUM_SERIALIZATION(grpc_request_base.h)
-
-PEERDIR(
- contrib/libs/grpc
- library/cpp/monlib/dynamic_counters/percentile
-)
-
-END()
-
-RECURSE_FOR_TESTS(ut)
-