diff options
author | ayasu <ayasu@yandex-team.ru> | 2022-02-10 16:50:18 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:18 +0300 |
commit | a1b790e06e8e99cffd5157a991259f9d0f993f66 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library | |
parent | 9f1131cecd3c7aacc2f7f803e8820eccbfcea203 (diff) | |
download | ydb-a1b790e06e8e99cffd5157a991259f9d0f993f66.tar.gz |
Restoring authorship annotation for <ayasu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.cpp | 62 | ||||
-rw-r--r-- | library/cpp/grpc/client/grpc_client_low.h | 30 | ||||
-rw-r--r-- | library/cpp/grpc/client/ut/grpc_client_low_ut.cpp | 122 | ||||
-rw-r--r-- | library/cpp/grpc/client/ut/ya.make | 22 | ||||
-rw-r--r-- | library/cpp/grpc/client/ya.make | 8 |
5 files changed, 122 insertions, 122 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp index b773d6a3d1..73cc908ef8 100644 --- a/library/cpp/grpc/client/grpc_client_low.cpp +++ b/library/cpp/grpc/client/grpc_client_low.cpp @@ -98,10 +98,10 @@ grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable = &TGRpcKeepAliveSocketMutator::Destroy }; -TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) +TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime) : TcpKeepAliveSettings_(tcpKeepAliveSettings) - , ExpireTime_(expireTime) - , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) + , ExpireTime_(expireTime) + , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20)) {} void TChannelPool::GetStubsHolderLocked( @@ -113,7 +113,7 @@ void TChannelPool::GetStubsHolderLocked( std::shared_lock readGuard(RWMutex_); const auto it = Pool_.find(channelId); if (it != Pool_.end()) { - if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { + if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) { return cb(it->second); } } @@ -124,14 +124,14 @@ void TChannelPool::GetStubsHolderLocked( auto it = Pool_.find(channelId); if (it != Pool_.end()) { if (!it->second.IsChannelBroken()) { - EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); - auto now = Now(); - LastUsedQueue_.emplace(now, channelId); - it->second.SetLastUseTime(now); + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + auto now = Now(); + LastUsedQueue_.emplace(now, channelId); + it->second.SetLastUseTime(now); return cb(it->second); } else { // This channel can't be used. Remove from pool to create new one - EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); + EraseFromQueueByTime(it->second.GetLastUseTime(), channelId); Pool_.erase(it); } } @@ -146,35 +146,35 @@ void TChannelPool::GetStubsHolderLocked( ); } cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second); - LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); + LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId); } } void TChannelPool::DeleteChannel(const TString& channelId) { std::unique_lock writeLock(RWMutex_); - auto poolIt = Pool_.find(channelId); - if (poolIt != Pool_.end()) { - EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); - Pool_.erase(poolIt); - } + auto poolIt = Pool_.find(channelId); + if (poolIt != Pool_.end()) { + EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId); + Pool_.erase(poolIt); + } +} + +void TChannelPool::DeleteExpiredStubsHolders() { + std::unique_lock writeLock(RWMutex_); + auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); + for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){ + Pool_.erase(i->second); + } + LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired); +} + +void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) { + auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime); + auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;}); + Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool"); + LastUsedQueue_.erase(pos); } -void TChannelPool::DeleteExpiredStubsHolders() { - std::unique_lock writeLock(RWMutex_); - auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_); - for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){ - Pool_.erase(i->second); - } - LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired); -} - -void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) { - auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime); - auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;}); - Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool"); - LastUsedQueue_.erase(pos); -} - static void PullEvents(grpc::CompletionQueue* cq) { TThread::SetCurrentThreadName("grpc_client"); while (true) { diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h index 07bf1dd1e0..ab0a0627be 100644 --- a/library/cpp/grpc/client/grpc_client_low.h +++ b/library/cpp/grpc/client/grpc_client_low.h @@ -486,16 +486,16 @@ public: } } } - - const TInstant& GetLastUseTime() const { - return LastUsed_; - } - - void SetLastUseTime(const TInstant& time) { - LastUsed_ = time; - } + + const TInstant& GetLastUseTime() const { + return LastUsed_; + } + + void SetLastUseTime(const TInstant& time) { + LastUsed_ = time; + } private: - TInstant LastUsed_ = Now(); + TInstant LastUsed_ = Now(); std::shared_mutex RWMutex_; std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_; std::shared_ptr<grpc::ChannelInterface> ChannelInterface_; @@ -503,20 +503,20 @@ private: class TChannelPool { public: - TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6)); + TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6)); //Allows to CreateStub from TStubsHolder under lock //The callback will be called just during GetStubsHolderLocked call void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb); void DeleteChannel(const TString& channelId); - void DeleteExpiredStubsHolders(); + void DeleteExpiredStubsHolders(); private: std::shared_mutex RWMutex_; std::unordered_map<TString, TStubsHolder> Pool_; - std::multimap<TInstant, TString> LastUsedQueue_; + std::multimap<TInstant, TString> LastUsedQueue_; TTcpKeepAliveSettings TcpKeepAliveSettings_; - TDuration ExpireTime_; - TDuration UpdateReUseTime_; - void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId); + TDuration ExpireTime_; + TDuration UpdateReUseTime_; + void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId); }; template<class TResponse> diff --git a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp b/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp index b119b9d9af..b8af2a518f 100644 --- a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp +++ b/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp @@ -1,61 +1,61 @@ -#include <library/cpp/grpc/client/grpc_client_low.h> - -#include <library/cpp/testing/unittest/registar.h> - -using namespace NGrpc; - -class TTestStub { -public: - std::shared_ptr<grpc::ChannelInterface> ChannelInterface; - TTestStub(std::shared_ptr<grpc::ChannelInterface> channelInterface) - : ChannelInterface(channelInterface) - {} -}; - -Y_UNIT_TEST_SUITE(ChannelPoolTests) { - Y_UNIT_TEST(UnusedStubsHoldersDeletion) { - TGRpcClientConfig clientConfig("invalid_host:invalid_port"); - TTcpKeepAliveSettings tcpKeepAliveSettings = - { - true, - 30, // NYdb::TCP_KEEPALIVE_IDLE, unused in UT, but is necessary in constructor - 5, // NYdb::TCP_KEEPALIVE_COUNT, unused in UT, but is necessary in constructor - 10 // NYdb::TCP_KEEPALIVE_INTERVAL, unused in UT, but is necessary in constructor - }; - auto channelPool = TChannelPool(tcpKeepAliveSettings, TDuration::MilliSeconds(250)); - std::vector<std::weak_ptr<grpc::ChannelInterface>> ChannelInterfacesWeak; - - { - std::vector<std::shared_ptr<TTestStub>> stubsHoldersShared; - auto storeStubsHolders = [&](TStubsHolder& stubsHolder) { - stubsHoldersShared.emplace_back(stubsHolder.GetOrCreateStub<TTestStub>()); - ChannelInterfacesWeak.emplace_back((*stubsHoldersShared.rbegin())->ChannelInterface); - return; - }; - for (int i = 0; i < 10; ++i) { - channelPool.GetStubsHolderLocked( - ToString(i), - clientConfig, - storeStubsHolders - ); - } - } - - auto now = Now(); - while (Now() < now + TDuration::MilliSeconds(500)){ - Sleep(TDuration::MilliSeconds(100)); - } - - channelPool.DeleteExpiredStubsHolders(); - - bool allDeleted = true; - for (auto i = ChannelInterfacesWeak.begin(); i != ChannelInterfacesWeak.end(); ++i) { - allDeleted = allDeleted && i->expired(); - } - - // assertion is made for channel interfaces instead of stubs, because after stub deletion - // TStubsHolder has the only shared_ptr for channel interface. - UNIT_ASSERT_C(allDeleted, "expired stubsHolders were not deleted after timeout"); - - } -} // ChannelPoolTests ut suite
\ No newline at end of file +#include <library/cpp/grpc/client/grpc_client_low.h> + +#include <library/cpp/testing/unittest/registar.h> + +using namespace NGrpc; + +class TTestStub { +public: + std::shared_ptr<grpc::ChannelInterface> ChannelInterface; + TTestStub(std::shared_ptr<grpc::ChannelInterface> channelInterface) + : ChannelInterface(channelInterface) + {} +}; + +Y_UNIT_TEST_SUITE(ChannelPoolTests) { + Y_UNIT_TEST(UnusedStubsHoldersDeletion) { + TGRpcClientConfig clientConfig("invalid_host:invalid_port"); + TTcpKeepAliveSettings tcpKeepAliveSettings = + { + true, + 30, // NYdb::TCP_KEEPALIVE_IDLE, unused in UT, but is necessary in constructor + 5, // NYdb::TCP_KEEPALIVE_COUNT, unused in UT, but is necessary in constructor + 10 // NYdb::TCP_KEEPALIVE_INTERVAL, unused in UT, but is necessary in constructor + }; + auto channelPool = TChannelPool(tcpKeepAliveSettings, TDuration::MilliSeconds(250)); + std::vector<std::weak_ptr<grpc::ChannelInterface>> ChannelInterfacesWeak; + + { + std::vector<std::shared_ptr<TTestStub>> stubsHoldersShared; + auto storeStubsHolders = [&](TStubsHolder& stubsHolder) { + stubsHoldersShared.emplace_back(stubsHolder.GetOrCreateStub<TTestStub>()); + ChannelInterfacesWeak.emplace_back((*stubsHoldersShared.rbegin())->ChannelInterface); + return; + }; + for (int i = 0; i < 10; ++i) { + channelPool.GetStubsHolderLocked( + ToString(i), + clientConfig, + storeStubsHolders + ); + } + } + + auto now = Now(); + while (Now() < now + TDuration::MilliSeconds(500)){ + Sleep(TDuration::MilliSeconds(100)); + } + + channelPool.DeleteExpiredStubsHolders(); + + bool allDeleted = true; + for (auto i = ChannelInterfacesWeak.begin(); i != ChannelInterfacesWeak.end(); ++i) { + allDeleted = allDeleted && i->expired(); + } + + // assertion is made for channel interfaces instead of stubs, because after stub deletion + // TStubsHolder has the only shared_ptr for channel interface. + UNIT_ASSERT_C(allDeleted, "expired stubsHolders were not deleted after timeout"); + + } +} // ChannelPoolTests ut suite
\ No newline at end of file diff --git a/library/cpp/grpc/client/ut/ya.make b/library/cpp/grpc/client/ut/ya.make index 8ca54c0342..eac779a99e 100644 --- a/library/cpp/grpc/client/ut/ya.make +++ b/library/cpp/grpc/client/ut/ya.make @@ -1,11 +1,11 @@ -UNITTEST_FOR(library/cpp/grpc/client) - -OWNER( - g:kikimr -) - -SRCS( - grpc_client_low_ut.cpp -) - -END() +UNITTEST_FOR(library/cpp/grpc/client) + +OWNER( + g:kikimr +) + +SRCS( + grpc_client_low_ut.cpp +) + +END() diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make index 120f48429a..a4e74b067c 100644 --- a/library/cpp/grpc/client/ya.make +++ b/library/cpp/grpc/client/ya.make @@ -14,7 +14,7 @@ PEERDIR( ) END() - -RECURSE_FOR_TESTS( - ut -)
\ No newline at end of file + +RECURSE_FOR_TESTS( + ut +)
\ No newline at end of file |