aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
commit4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.cpp
parent17e20fa084178ddcb16255f974dbde74fb93608b (diff)
downloadydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp414
1 files changed, 207 insertions, 207 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index ee9e997fa7..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -1,164 +1,164 @@
-#include "grpc_client_low.h"
-#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
-#include <contrib/libs/grpc/include/grpc/support/log.h>
-
-#include <library/cpp/containers/stack_vector/stack_vec.h>
-
-#include <util/string/printf.h>
+#include "grpc_client_low.h"
+#include <contrib/libs/grpc/src/core/lib/iomgr/socket_mutator.h>
+#include <contrib/libs/grpc/include/grpc/support/log.h>
+
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+
+#include <util/string/printf.h>
#include <util/system/thread.h>
#include <util/random/random.h>
-
-#if !defined(_WIN32) && !defined(_WIN64)
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
-#endif
-
+
+#if !defined(_WIN32) && !defined(_WIN64)
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
namespace NGrpc {
-
-void EnableGRpcTracing() {
- grpc_tracer_set_enabled("tcp", true);
- grpc_tracer_set_enabled("client_channel", true);
- grpc_tracer_set_enabled("channel", true);
- grpc_tracer_set_enabled("api", true);
- grpc_tracer_set_enabled("connectivity_state", true);
- grpc_tracer_set_enabled("handshaker", true);
- grpc_tracer_set_enabled("http", true);
- grpc_tracer_set_enabled("http2_stream_state", true);
- grpc_tracer_set_enabled("op_failure", true);
- grpc_tracer_set_enabled("timer", true);
- gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
-}
-
-class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
-public:
- TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
- : Idle_(idle)
- , Count_(count)
- , Interval_(interval)
- {
- grpc_socket_mutator_init(this, &VTable);
- }
-private:
- static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
- return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
- }
-
- template<typename TVal>
- bool SetOption(int fd, int level, int optname, const TVal& value) {
- return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
- }
- bool SetOption(int fd) {
- if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
- Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
- return false;
- }
-#ifdef _linux_
- if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
- return false;
- }
- if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
- Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
- return false;
- }
-#endif
- return true;
- }
- static bool Mutate(int fd, grpc_socket_mutator* mutator) {
- auto self = Cast(mutator);
- return self->SetOption(fd);
- }
- static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
- const auto* selfA = Cast(a);
- const auto* selfB = Cast(b);
- auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
- auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
- return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
- }
- static void Destroy(grpc_socket_mutator* mutator) {
- delete Cast(mutator);
- }
-
- static grpc_socket_mutator_vtable VTable;
- const int Idle_;
- const int Count_;
- const int Interval_;
-};
-
-grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
- {
- &TGRpcKeepAliveSocketMutator::Mutate,
- &TGRpcKeepAliveSocketMutator::Compare,
- &TGRpcKeepAliveSocketMutator::Destroy
- };
-
+
+void EnableGRpcTracing() {
+ grpc_tracer_set_enabled("tcp", true);
+ grpc_tracer_set_enabled("client_channel", true);
+ grpc_tracer_set_enabled("channel", true);
+ grpc_tracer_set_enabled("api", true);
+ grpc_tracer_set_enabled("connectivity_state", true);
+ grpc_tracer_set_enabled("handshaker", true);
+ grpc_tracer_set_enabled("http", true);
+ grpc_tracer_set_enabled("http2_stream_state", true);
+ grpc_tracer_set_enabled("op_failure", true);
+ grpc_tracer_set_enabled("timer", true);
+ gpr_set_log_verbosity(GPR_LOG_SEVERITY_DEBUG);
+}
+
+class TGRpcKeepAliveSocketMutator : public grpc_socket_mutator {
+public:
+ TGRpcKeepAliveSocketMutator(int idle, int count, int interval)
+ : Idle_(idle)
+ , Count_(count)
+ , Interval_(interval)
+ {
+ grpc_socket_mutator_init(this, &VTable);
+ }
+private:
+ static TGRpcKeepAliveSocketMutator* Cast(grpc_socket_mutator* mutator) {
+ return static_cast<TGRpcKeepAliveSocketMutator*>(mutator);
+ }
+
+ template<typename TVal>
+ bool SetOption(int fd, int level, int optname, const TVal& value) {
+ return setsockopt(fd, level, optname, reinterpret_cast<const char*>(&value), sizeof(value)) == 0;
+ }
+ bool SetOption(int fd) {
+ if (!SetOption(fd, SOL_SOCKET, SO_KEEPALIVE, 1)) {
+ Cerr << Sprintf("Failed to set SO_KEEPALIVE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+#ifdef _linux_
+ if (Idle_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPIDLE, Idle_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPIDLE option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Count_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPCNT, Count_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPCNT option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+ if (Interval_ && !SetOption(fd, IPPROTO_TCP, TCP_KEEPINTVL, Interval_)) {
+ Cerr << Sprintf("Failed to set TCP_KEEPINTVL option: %s", strerror(errno)) << Endl;
+ return false;
+ }
+#endif
+ return true;
+ }
+ static bool Mutate(int fd, grpc_socket_mutator* mutator) {
+ auto self = Cast(mutator);
+ return self->SetOption(fd);
+ }
+ static int Compare(grpc_socket_mutator* a, grpc_socket_mutator* b) {
+ const auto* selfA = Cast(a);
+ const auto* selfB = Cast(b);
+ auto tupleA = std::make_tuple(selfA->Idle_, selfA->Count_, selfA->Interval_);
+ auto tupleB = std::make_tuple(selfB->Idle_, selfB->Count_, selfB->Interval_);
+ return tupleA < tupleB ? -1 : tupleA > tupleB ? 1 : 0;
+ }
+ static void Destroy(grpc_socket_mutator* mutator) {
+ delete Cast(mutator);
+ }
+
+ static grpc_socket_mutator_vtable VTable;
+ const int Idle_;
+ const int Count_;
+ const int Interval_;
+};
+
+grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
+ {
+ &TGRpcKeepAliveSocketMutator::Mutate,
+ &TGRpcKeepAliveSocketMutator::Compare,
+ &TGRpcKeepAliveSocketMutator::Destroy
+ };
+
TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
- : TcpKeepAliveSettings_(tcpKeepAliveSettings)
+ : TcpKeepAliveSettings_(tcpKeepAliveSettings)
, ExpireTime_(expireTime)
, UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
-{}
-
-void TChannelPool::GetStubsHolderLocked(
- const TString& channelId,
- const TGRpcClientConfig& config,
- std::function<void(TStubsHolder&)> cb)
-{
- {
- std::shared_lock readGuard(RWMutex_);
- const auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
+{}
+
+void TChannelPool::GetStubsHolderLocked(
+ const TString& channelId,
+ const TGRpcClientConfig& config,
+ std::function<void(TStubsHolder&)> cb)
+{
+ {
+ std::shared_lock readGuard(RWMutex_);
+ const auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
- return cb(it->second);
- }
- }
- }
- {
- std::unique_lock writeGuard(RWMutex_);
- {
- auto it = Pool_.find(channelId);
- if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken()) {
+ return cb(it->second);
+ }
+ }
+ }
+ {
+ std::unique_lock writeGuard(RWMutex_);
+ {
+ auto it = Pool_.find(channelId);
+ if (it != Pool_.end()) {
+ if (!it->second.IsChannelBroken()) {
EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
auto now = Now();
LastUsedQueue_.emplace(now, channelId);
it->second.SetLastUseTime(now);
- return cb(it->second);
- } else {
- // This channel can't be used. Remove from pool to create new one
+ return cb(it->second);
+ } else {
+ // This channel can't be used. Remove from pool to create new one
EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- Pool_.erase(it);
- }
- }
- }
- TGRpcKeepAliveSocketMutator* mutator = nullptr;
- // will be destroyed inside grpc
- if (TcpKeepAliveSettings_.Enabled) {
- mutator = new TGRpcKeepAliveSocketMutator(
- TcpKeepAliveSettings_.Idle,
- TcpKeepAliveSettings_.Count,
- TcpKeepAliveSettings_.Interval
- );
- }
- cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
+ Pool_.erase(it);
+ }
+ }
+ }
+ TGRpcKeepAliveSocketMutator* mutator = nullptr;
+ // will be destroyed inside grpc
+ if (TcpKeepAliveSettings_.Enabled) {
+ mutator = new TGRpcKeepAliveSocketMutator(
+ TcpKeepAliveSettings_.Idle,
+ TcpKeepAliveSettings_.Count,
+ TcpKeepAliveSettings_.Interval
+ );
+ }
+ cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
- }
-}
-
-void TChannelPool::DeleteChannel(const TString& channelId) {
- std::unique_lock writeLock(RWMutex_);
+ }
+}
+
+void TChannelPool::DeleteChannel(const TString& channelId) {
+ std::unique_lock writeLock(RWMutex_);
auto poolIt = Pool_.find(channelId);
if (poolIt != Pool_.end()) {
EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
Pool_.erase(poolIt);
}
-}
-
+}
+
void TChannelPool::DeleteExpiredStubsHolders() {
std::unique_lock writeLock(RWMutex_);
auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
@@ -392,7 +392,7 @@ private:
// Some children are stored inline, others are in a set
std::array<TContextImpl*, 2> InlineChildren{ { nullptr, nullptr } };
- std::unordered_set<TContextImpl*> Children;
+ std::unordered_set<TContextImpl*> Children;
// Single callback is stored without extra allocations
TStackVec<TCallback, 1> Callbacks;
@@ -404,10 +404,10 @@ private:
TGRpcClientLow::TGRpcClientLow(size_t numWorkerThread, bool useCompletionQueuePerThread)
: UseCompletionQueuePerThread_(useCompletionQueuePerThread)
{
- Init(numWorkerThread);
-}
-
-void TGRpcClientLow::Init(size_t numWorkerThread) {
+ Init(numWorkerThread);
+}
+
+void TGRpcClientLow::Init(size_t numWorkerThread) {
SetCqState(WORKING);
if (UseCompletionQueuePerThread_) {
for (size_t i = 0; i < numWorkerThread; i++) {
@@ -425,9 +425,9 @@ void TGRpcClientLow::Init(size_t numWorkerThread) {
PullEvents(cq);
}).Release());
}
- }
-}
-
+ }
+}
+
void TGRpcClientLow::AddWorkerThreadForTest() {
if (UseCompletionQueuePerThread_) {
CQS_.push_back(std::make_unique<grpc::CompletionQueue>());
@@ -453,17 +453,17 @@ void TGRpcClientLow::Stop(bool wait) {
if (wait) {
WaitInternal();
- }
-}
-
+ }
+}
+
void TGRpcClientLow::StopInternal(bool silent) {
bool shutdown;
TVector<TContextImpl::TContextPtr> cancelQueue;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+
auto allowStateChange = [&]() {
switch (GetCqState()) {
case WORKING:
@@ -484,7 +484,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
SetCqState(silent ? STOP_SILENT : STOP_EXPLICIT);
- if (!silent && !Contexts_.empty()) {
+ if (!silent && !Contexts_.empty()) {
cancelQueue.reserve(Contexts_.size());
for (auto* ptr : Contexts_) {
// N.B. some contexts may be stuck in destructors
@@ -494,7 +494,7 @@ void TGRpcClientLow::StopInternal(bool silent) {
}
}
- shutdown = Contexts_.empty();
+ shutdown = Contexts_.empty();
}
for (auto& context : cancelQueue) {
@@ -506,62 +506,62 @@ void TGRpcClientLow::StopInternal(bool silent) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
- }
+ }
}
void TGRpcClientLow::WaitInternal() {
- std::unique_lock<std::mutex> guard(JoinMutex_);
-
- for (auto& ti : WorkerThreads_) {
- ti->Join();
- }
-}
-
+ std::unique_lock<std::mutex> guard(JoinMutex_);
+
+ for (auto& ti : WorkerThreads_) {
+ ti->Join();
+ }
+}
+
void TGRpcClientLow::WaitIdle() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- while (!Contexts_.empty()) {
- ContextsEmpty_.wait(guard);
- }
-}
-
+ std::unique_lock<std::mutex> guard(Mtx_);
+
+ while (!Contexts_.empty()) {
+ ContextsEmpty_.wait(guard);
+ }
+}
+
std::shared_ptr<IQueueClientContext> TGRpcClientLow::CreateContext() {
- std::unique_lock<std::mutex> guard(Mtx_);
-
- auto allowCreateContext = [&]() {
- switch (GetCqState()) {
- case WORKING:
- return true;
- case STOP_SILENT:
- case STOP_EXPLICIT:
- return false;
+ std::unique_lock<std::mutex> guard(Mtx_);
+
+ auto allowCreateContext = [&]() {
+ switch (GetCqState()) {
+ case WORKING:
+ return true;
+ case STOP_SILENT:
+ case STOP_EXPLICIT:
+ return false;
}
- Y_UNREACHABLE();
- };
-
- if (!allowCreateContext()) {
- // New context creation is forbidden
- return nullptr;
- }
-
- auto context = std::make_shared<TContextImpl>();
- Contexts_.insert(context.get());
- context->Owner = this;
- if (UseCompletionQueuePerThread_) {
- context->CQ = CQS_[RandomNumber(CQS_.size())].get();
- } else {
- context->CQ = CQS_[0].get();
- }
- return context;
+ Y_UNREACHABLE();
+ };
+
+ if (!allowCreateContext()) {
+ // New context creation is forbidden
+ return nullptr;
+ }
+
+ auto context = std::make_shared<TContextImpl>();
+ Contexts_.insert(context.get());
+ context->Owner = this;
+ if (UseCompletionQueuePerThread_) {
+ context->CQ = CQS_[RandomNumber(CQS_.size())].get();
+ } else {
+ context->CQ = CQS_[0].get();
+ }
+ return context;
}
void TGRpcClientLow::ForgetContext(TContextImpl* context) {
bool shutdown = false;
- {
- std::unique_lock<std::mutex> guard(Mtx_);
-
+ {
+ std::unique_lock<std::mutex> guard(Mtx_);
+
if (!Contexts_.erase(context)) {
Y_FAIL("Unexpected ForgetContext(%p)", context);
}
@@ -571,7 +571,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
shutdown = true;
}
- ContextsEmpty_.notify_all();
+ ContextsEmpty_.notify_all();
}
}
@@ -580,7 +580,7 @@ void TGRpcClientLow::ForgetContext(TContextImpl* context) {
for (auto& cq : CQS_) {
cq->Shutdown();
}
- }
-}
-
+ }
+}
+
} // namespace NGRpc