aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.cpp
diff options
context:
space:
mode:
authorayasu <ayasu@yandex-team.ru>2022-02-10 16:50:18 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:18 +0300
commita1b790e06e8e99cffd5157a991259f9d0f993f66 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/client/grpc_client_low.cpp
parent9f1131cecd3c7aacc2f7f803e8820eccbfcea203 (diff)
downloadydb-a1b790e06e8e99cffd5157a991259f9d0f993f66.tar.gz
Restoring authorship annotation for <ayasu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp62
1 files changed, 31 insertions, 31 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index b773d6a3d1..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -98,10 +98,10 @@ grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
&TGRpcKeepAliveSocketMutator::Destroy
};
-TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
+TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
: TcpKeepAliveSettings_(tcpKeepAliveSettings)
- , ExpireTime_(expireTime)
- , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
+ , ExpireTime_(expireTime)
+ , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
{}
void TChannelPool::GetStubsHolderLocked(
@@ -113,7 +113,7 @@ void TChannelPool::GetStubsHolderLocked(
std::shared_lock readGuard(RWMutex_);
const auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
+ if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
return cb(it->second);
}
}
@@ -124,14 +124,14 @@ void TChannelPool::GetStubsHolderLocked(
auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
if (!it->second.IsChannelBroken()) {
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- auto now = Now();
- LastUsedQueue_.emplace(now, channelId);
- it->second.SetLastUseTime(now);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ auto now = Now();
+ LastUsedQueue_.emplace(now, channelId);
+ it->second.SetLastUseTime(now);
return cb(it->second);
} else {
// This channel can't be used. Remove from pool to create new one
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
Pool_.erase(it);
}
}
@@ -146,35 +146,35 @@ void TChannelPool::GetStubsHolderLocked(
);
}
cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
- LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
+ LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
}
}
void TChannelPool::DeleteChannel(const TString& channelId) {
std::unique_lock writeLock(RWMutex_);
- auto poolIt = Pool_.find(channelId);
- if (poolIt != Pool_.end()) {
- EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
- Pool_.erase(poolIt);
- }
+ auto poolIt = Pool_.find(channelId);
+ if (poolIt != Pool_.end()) {
+ EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
+ Pool_.erase(poolIt);
+ }
+}
+
+void TChannelPool::DeleteExpiredStubsHolders() {
+ std::unique_lock writeLock(RWMutex_);
+ auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
+ for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
+ Pool_.erase(i->second);
+ }
+ LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
+}
+
+void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
+ auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
+ auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
+ Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
+ LastUsedQueue_.erase(pos);
}
-void TChannelPool::DeleteExpiredStubsHolders() {
- std::unique_lock writeLock(RWMutex_);
- auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
- for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
- Pool_.erase(i->second);
- }
- LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
-}
-
-void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
- auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
- auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
- LastUsedQueue_.erase(pos);
-}
-
static void PullEvents(grpc::CompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_client");
while (true) {