aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/client/grpc_client_low.cpp
diff options
context:
space:
mode:
authorayasu <ayasu@yandex-team.ru>2022-02-10 16:50:18 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:18 +0300
commit9f1131cecd3c7aacc2f7f803e8820eccbfcea203 (patch)
treea6c43735ae7ed78a6d9c42ed7eaaef7226e2b519 /library/cpp/grpc/client/grpc_client_low.cpp
parent4b11037e5a7d071c63e3c966199fe7102e6462e4 (diff)
downloadydb-9f1131cecd3c7aacc2f7f803e8820eccbfcea203.tar.gz
Restoring authorship annotation for <ayasu@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp62
1 files changed, 31 insertions, 31 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index 73cc908ef8..b773d6a3d1 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -98,10 +98,10 @@ grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
&TGRpcKeepAliveSocketMutator::Destroy
};
-TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
+TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
: TcpKeepAliveSettings_(tcpKeepAliveSettings)
- , ExpireTime_(expireTime)
- , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
+ , ExpireTime_(expireTime)
+ , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
{}
void TChannelPool::GetStubsHolderLocked(
@@ -113,7 +113,7 @@ void TChannelPool::GetStubsHolderLocked(
std::shared_lock readGuard(RWMutex_);
const auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
+ if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
return cb(it->second);
}
}
@@ -124,14 +124,14 @@ void TChannelPool::GetStubsHolderLocked(
auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
if (!it->second.IsChannelBroken()) {
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- auto now = Now();
- LastUsedQueue_.emplace(now, channelId);
- it->second.SetLastUseTime(now);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ auto now = Now();
+ LastUsedQueue_.emplace(now, channelId);
+ it->second.SetLastUseTime(now);
return cb(it->second);
} else {
// This channel can't be used. Remove from pool to create new one
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
Pool_.erase(it);
}
}
@@ -146,35 +146,35 @@ void TChannelPool::GetStubsHolderLocked(
);
}
cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
- LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
+ LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
}
}
void TChannelPool::DeleteChannel(const TString& channelId) {
std::unique_lock writeLock(RWMutex_);
- auto poolIt = Pool_.find(channelId);
- if (poolIt != Pool_.end()) {
- EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
- Pool_.erase(poolIt);
- }
-}
-
-void TChannelPool::DeleteExpiredStubsHolders() {
- std::unique_lock writeLock(RWMutex_);
- auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
- for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
- Pool_.erase(i->second);
- }
- LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
-}
-
-void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
- auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
- auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
- LastUsedQueue_.erase(pos);
+ auto poolIt = Pool_.find(channelId);
+ if (poolIt != Pool_.end()) {
+ EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
+ Pool_.erase(poolIt);
+ }
}
+void TChannelPool::DeleteExpiredStubsHolders() {
+ std::unique_lock writeLock(RWMutex_);
+ auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
+ for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
+ Pool_.erase(i->second);
+ }
+ LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
+}
+
+void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
+ auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
+ auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
+ Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
+ LastUsedQueue_.erase(pos);
+}
+
static void PullEvents(grpc::CompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_client");
while (true) {