aboutsummaryrefslogtreecommitdiffstats
path: root/library
diff options
context:
space:
mode:
authorayasu <ayasu@yandex-team.ru>2022-02-10 16:50:18 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:18 +0300
commita1b790e06e8e99cffd5157a991259f9d0f993f66 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library
parent9f1131cecd3c7aacc2f7f803e8820eccbfcea203 (diff)
downloadydb-a1b790e06e8e99cffd5157a991259f9d0f993f66.tar.gz
Restoring authorship annotation for <ayasu@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library')
-rw-r--r--library/cpp/grpc/client/grpc_client_low.cpp62
-rw-r--r--library/cpp/grpc/client/grpc_client_low.h30
-rw-r--r--library/cpp/grpc/client/ut/grpc_client_low_ut.cpp122
-rw-r--r--library/cpp/grpc/client/ut/ya.make22
-rw-r--r--library/cpp/grpc/client/ya.make8
5 files changed, 122 insertions, 122 deletions
diff --git a/library/cpp/grpc/client/grpc_client_low.cpp b/library/cpp/grpc/client/grpc_client_low.cpp
index b773d6a3d1..73cc908ef8 100644
--- a/library/cpp/grpc/client/grpc_client_low.cpp
+++ b/library/cpp/grpc/client/grpc_client_low.cpp
@@ -98,10 +98,10 @@ grpc_socket_mutator_vtable TGRpcKeepAliveSocketMutator::VTable =
&TGRpcKeepAliveSocketMutator::Destroy
};
-TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
+TChannelPool::TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime)
: TcpKeepAliveSettings_(tcpKeepAliveSettings)
- , ExpireTime_(expireTime)
- , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
+ , ExpireTime_(expireTime)
+ , UpdateReUseTime_(ExpireTime_ * 0.3 < TDuration::Seconds(20) ? ExpireTime_ * 0.3 : TDuration::Seconds(20))
{}
void TChannelPool::GetStubsHolderLocked(
@@ -113,7 +113,7 @@ void TChannelPool::GetStubsHolderLocked(
std::shared_lock readGuard(RWMutex_);
const auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
- if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
+ if (!it->second.IsChannelBroken() && !(Now() > it->second.GetLastUseTime() + UpdateReUseTime_)) {
return cb(it->second);
}
}
@@ -124,14 +124,14 @@ void TChannelPool::GetStubsHolderLocked(
auto it = Pool_.find(channelId);
if (it != Pool_.end()) {
if (!it->second.IsChannelBroken()) {
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
- auto now = Now();
- LastUsedQueue_.emplace(now, channelId);
- it->second.SetLastUseTime(now);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ auto now = Now();
+ LastUsedQueue_.emplace(now, channelId);
+ it->second.SetLastUseTime(now);
return cb(it->second);
} else {
// This channel can't be used. Remove from pool to create new one
- EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
+ EraseFromQueueByTime(it->second.GetLastUseTime(), channelId);
Pool_.erase(it);
}
}
@@ -146,35 +146,35 @@ void TChannelPool::GetStubsHolderLocked(
);
}
cb(Pool_.emplace(channelId, CreateChannelInterface(config, mutator)).first->second);
- LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
+ LastUsedQueue_.emplace(Pool_.at(channelId).GetLastUseTime(), channelId);
}
}
void TChannelPool::DeleteChannel(const TString& channelId) {
std::unique_lock writeLock(RWMutex_);
- auto poolIt = Pool_.find(channelId);
- if (poolIt != Pool_.end()) {
- EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
- Pool_.erase(poolIt);
- }
+ auto poolIt = Pool_.find(channelId);
+ if (poolIt != Pool_.end()) {
+ EraseFromQueueByTime(poolIt->second.GetLastUseTime(), channelId);
+ Pool_.erase(poolIt);
+ }
+}
+
+void TChannelPool::DeleteExpiredStubsHolders() {
+ std::unique_lock writeLock(RWMutex_);
+ auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
+ for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
+ Pool_.erase(i->second);
+ }
+ LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
+}
+
+void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
+ auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
+ auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
+ Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
+ LastUsedQueue_.erase(pos);
}
-void TChannelPool::DeleteExpiredStubsHolders() {
- std::unique_lock writeLock(RWMutex_);
- auto lastExpired = LastUsedQueue_.lower_bound(Now() - ExpireTime_);
- for (auto i = LastUsedQueue_.begin(); i != lastExpired; ++i){
- Pool_.erase(i->second);
- }
- LastUsedQueue_.erase(LastUsedQueue_.begin(), lastExpired);
-}
-
-void TChannelPool::EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId) {
- auto [begin, end] = LastUsedQueue_.equal_range(lastUseTime);
- auto pos = std::find_if(begin, end, [&](auto a){return a.second == channelId;});
- Y_VERIFY(pos != LastUsedQueue_.end(), "data corruption at TChannelPool");
- LastUsedQueue_.erase(pos);
-}
-
static void PullEvents(grpc::CompletionQueue* cq) {
TThread::SetCurrentThreadName("grpc_client");
while (true) {
diff --git a/library/cpp/grpc/client/grpc_client_low.h b/library/cpp/grpc/client/grpc_client_low.h
index 07bf1dd1e0..ab0a0627be 100644
--- a/library/cpp/grpc/client/grpc_client_low.h
+++ b/library/cpp/grpc/client/grpc_client_low.h
@@ -486,16 +486,16 @@ public:
}
}
}
-
- const TInstant& GetLastUseTime() const {
- return LastUsed_;
- }
-
- void SetLastUseTime(const TInstant& time) {
- LastUsed_ = time;
- }
+
+ const TInstant& GetLastUseTime() const {
+ return LastUsed_;
+ }
+
+ void SetLastUseTime(const TInstant& time) {
+ LastUsed_ = time;
+ }
private:
- TInstant LastUsed_ = Now();
+ TInstant LastUsed_ = Now();
std::shared_mutex RWMutex_;
std::unordered_map<TypeInfoRef, std::shared_ptr<void>, THasher, TEqualTo> Stubs_;
std::shared_ptr<grpc::ChannelInterface> ChannelInterface_;
@@ -503,20 +503,20 @@ private:
class TChannelPool {
public:
- TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
+ TChannelPool(const TTcpKeepAliveSettings& tcpKeepAliveSettings, const TDuration& expireTime = TDuration::Minutes(6));
//Allows to CreateStub from TStubsHolder under lock
//The callback will be called just during GetStubsHolderLocked call
void GetStubsHolderLocked(const TString& channelId, const TGRpcClientConfig& config, std::function<void(TStubsHolder&)> cb);
void DeleteChannel(const TString& channelId);
- void DeleteExpiredStubsHolders();
+ void DeleteExpiredStubsHolders();
private:
std::shared_mutex RWMutex_;
std::unordered_map<TString, TStubsHolder> Pool_;
- std::multimap<TInstant, TString> LastUsedQueue_;
+ std::multimap<TInstant, TString> LastUsedQueue_;
TTcpKeepAliveSettings TcpKeepAliveSettings_;
- TDuration ExpireTime_;
- TDuration UpdateReUseTime_;
- void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
+ TDuration ExpireTime_;
+ TDuration UpdateReUseTime_;
+ void EraseFromQueueByTime(const TInstant& lastUseTime, const TString& channelId);
};
template<class TResponse>
diff --git a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp b/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp
index b119b9d9af..b8af2a518f 100644
--- a/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp
+++ b/library/cpp/grpc/client/ut/grpc_client_low_ut.cpp
@@ -1,61 +1,61 @@
-#include <library/cpp/grpc/client/grpc_client_low.h>
-
-#include <library/cpp/testing/unittest/registar.h>
-
-using namespace NGrpc;
-
-class TTestStub {
-public:
- std::shared_ptr<grpc::ChannelInterface> ChannelInterface;
- TTestStub(std::shared_ptr<grpc::ChannelInterface> channelInterface)
- : ChannelInterface(channelInterface)
- {}
-};
-
-Y_UNIT_TEST_SUITE(ChannelPoolTests) {
- Y_UNIT_TEST(UnusedStubsHoldersDeletion) {
- TGRpcClientConfig clientConfig("invalid_host:invalid_port");
- TTcpKeepAliveSettings tcpKeepAliveSettings =
- {
- true,
- 30, // NYdb::TCP_KEEPALIVE_IDLE, unused in UT, but is necessary in constructor
- 5, // NYdb::TCP_KEEPALIVE_COUNT, unused in UT, but is necessary in constructor
- 10 // NYdb::TCP_KEEPALIVE_INTERVAL, unused in UT, but is necessary in constructor
- };
- auto channelPool = TChannelPool(tcpKeepAliveSettings, TDuration::MilliSeconds(250));
- std::vector<std::weak_ptr<grpc::ChannelInterface>> ChannelInterfacesWeak;
-
- {
- std::vector<std::shared_ptr<TTestStub>> stubsHoldersShared;
- auto storeStubsHolders = [&](TStubsHolder& stubsHolder) {
- stubsHoldersShared.emplace_back(stubsHolder.GetOrCreateStub<TTestStub>());
- ChannelInterfacesWeak.emplace_back((*stubsHoldersShared.rbegin())->ChannelInterface);
- return;
- };
- for (int i = 0; i < 10; ++i) {
- channelPool.GetStubsHolderLocked(
- ToString(i),
- clientConfig,
- storeStubsHolders
- );
- }
- }
-
- auto now = Now();
- while (Now() < now + TDuration::MilliSeconds(500)){
- Sleep(TDuration::MilliSeconds(100));
- }
-
- channelPool.DeleteExpiredStubsHolders();
-
- bool allDeleted = true;
- for (auto i = ChannelInterfacesWeak.begin(); i != ChannelInterfacesWeak.end(); ++i) {
- allDeleted = allDeleted && i->expired();
- }
-
- // assertion is made for channel interfaces instead of stubs, because after stub deletion
- // TStubsHolder has the only shared_ptr for channel interface.
- UNIT_ASSERT_C(allDeleted, "expired stubsHolders were not deleted after timeout");
-
- }
-} // ChannelPoolTests ut suite \ No newline at end of file
+#include <library/cpp/grpc/client/grpc_client_low.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+using namespace NGrpc;
+
+class TTestStub {
+public:
+ std::shared_ptr<grpc::ChannelInterface> ChannelInterface;
+ TTestStub(std::shared_ptr<grpc::ChannelInterface> channelInterface)
+ : ChannelInterface(channelInterface)
+ {}
+};
+
+Y_UNIT_TEST_SUITE(ChannelPoolTests) {
+ Y_UNIT_TEST(UnusedStubsHoldersDeletion) {
+ TGRpcClientConfig clientConfig("invalid_host:invalid_port");
+ TTcpKeepAliveSettings tcpKeepAliveSettings =
+ {
+ true,
+ 30, // NYdb::TCP_KEEPALIVE_IDLE, unused in UT, but is necessary in constructor
+ 5, // NYdb::TCP_KEEPALIVE_COUNT, unused in UT, but is necessary in constructor
+ 10 // NYdb::TCP_KEEPALIVE_INTERVAL, unused in UT, but is necessary in constructor
+ };
+ auto channelPool = TChannelPool(tcpKeepAliveSettings, TDuration::MilliSeconds(250));
+ std::vector<std::weak_ptr<grpc::ChannelInterface>> ChannelInterfacesWeak;
+
+ {
+ std::vector<std::shared_ptr<TTestStub>> stubsHoldersShared;
+ auto storeStubsHolders = [&](TStubsHolder& stubsHolder) {
+ stubsHoldersShared.emplace_back(stubsHolder.GetOrCreateStub<TTestStub>());
+ ChannelInterfacesWeak.emplace_back((*stubsHoldersShared.rbegin())->ChannelInterface);
+ return;
+ };
+ for (int i = 0; i < 10; ++i) {
+ channelPool.GetStubsHolderLocked(
+ ToString(i),
+ clientConfig,
+ storeStubsHolders
+ );
+ }
+ }
+
+ auto now = Now();
+ while (Now() < now + TDuration::MilliSeconds(500)){
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ channelPool.DeleteExpiredStubsHolders();
+
+ bool allDeleted = true;
+ for (auto i = ChannelInterfacesWeak.begin(); i != ChannelInterfacesWeak.end(); ++i) {
+ allDeleted = allDeleted && i->expired();
+ }
+
+ // assertion is made for channel interfaces instead of stubs, because after stub deletion
+ // TStubsHolder has the only shared_ptr for channel interface.
+ UNIT_ASSERT_C(allDeleted, "expired stubsHolders were not deleted after timeout");
+
+ }
+} // ChannelPoolTests ut suite \ No newline at end of file
diff --git a/library/cpp/grpc/client/ut/ya.make b/library/cpp/grpc/client/ut/ya.make
index 8ca54c0342..eac779a99e 100644
--- a/library/cpp/grpc/client/ut/ya.make
+++ b/library/cpp/grpc/client/ut/ya.make
@@ -1,11 +1,11 @@
-UNITTEST_FOR(library/cpp/grpc/client)
-
-OWNER(
- g:kikimr
-)
-
-SRCS(
- grpc_client_low_ut.cpp
-)
-
-END()
+UNITTEST_FOR(library/cpp/grpc/client)
+
+OWNER(
+ g:kikimr
+)
+
+SRCS(
+ grpc_client_low_ut.cpp
+)
+
+END()
diff --git a/library/cpp/grpc/client/ya.make b/library/cpp/grpc/client/ya.make
index 120f48429a..a4e74b067c 100644
--- a/library/cpp/grpc/client/ya.make
+++ b/library/cpp/grpc/client/ya.make
@@ -14,7 +14,7 @@ PEERDIR(
)
END()
-
-RECURSE_FOR_TESTS(
- ut
-) \ No newline at end of file
+
+RECURSE_FOR_TESTS(
+ ut
+) \ No newline at end of file