diff options
author | ayasu <ayasu@yandex-team.ru> | 2022-02-10 16:50:18 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:18 +0300 |
commit | 9f1131cecd3c7aacc2f7f803e8820eccbfcea203 (patch) | |
tree | a6c43735ae7ed78a6d9c42ed7eaaef7226e2b519 /library/cpp/grpc/client/grpc_client_low.cpp | |
parent | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (diff) | |
download | ydb-9f1131cecd3c7aacc2f7f803e8820eccbfcea203.tar.gz |
Restoring authorship annotation for <ayasu@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'library/cpp/grpc/client/grpc_client_low.cpp')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 62 |
1 files changed, 31 insertions, 31 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index 73cc908ef8..b773d6a3d1 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -98,10 +98,10 @@ grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = &TGRpcKeepAliveSocketMutator::Destroy }; -TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) +TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) : TcpKeepAliveSettings_(tcpKeepAliveSettings) - , ExpireTime_(expireTime) - , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) + , ExpireTime_(expireTime) + , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) {} void TChannelPool::GetStubsHolderLocked( @@ -113,7 +113,7 @@ void TChannelPool::GetStubsHolderLocked( std::shared_lock readGuard(RWMutex_); const auto it = Pool_.find(channelId); if (it != Pool_.end()) { - if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { + if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { return cb(it->second); } } @@ -124,14 +124,14 @@ void TChannelPool::GetStubsHolderLocked( auto it = Pool_.find(channelId); if (it != Pool_.end()) { if (!it->second.IsChannelBroken()) { - EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); - auto now = Now(); - LastUsedQueue_.emplace(now, channelId); - it->second.SetLastUseTime(now); + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + auto now = Now(); + LastUsedQueue_.emplace(now, channelId); + it->second.SetLastUseTime(now); return cb(it->second); } else { // This channel can't be used. Remove from pool to create new one - EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); Pool_.erase(it); } } @@ -146,35 +146,35 @@ void TChannelPool::GetStubsHolderLocked( ); } cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); - LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); + LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); } } void TChannelPool::DeleteChannel(const TString& channelId) { std::unique_lock writeLock(RWMutex_); - auto poolIt = Pool_.find(channelId); - if (poolIt != Pool_.end()) { - EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); - Pool_.erase(poolIt); - } -} - -void TChannelPool::DeleteExpiredStubsHolders() { - std::unique_lock writeLock(RWMutex_); - auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); - for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){ - Pool_.erase(i->second); - } - LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired); -} - -void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) { - auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime); - auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;}); - Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool"); - LastUsedQueue_.erase(pos); + auto poolIt = Pool_.find(channelId); + if (poolIt != Pool_.end()) { + EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); + Pool_.erase(poolIt); + } } +void TChannelPool::DeleteExpiredStubsHolders() { + std::unique_lock writeLock(RWMutex_); + auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); + for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){ + Pool_.erase(i->second); + } + LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired); +} + +void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) { + auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime); + auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;}); + Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool"); + LastUsedQueue_.erase(pos); +} + static void PullEvents(grpc::CompletionQueue* cq) { TThread::SetCurrentThreadName("grpc_client"); while (true) { |