aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-09-18 14:37:21 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-09-18 14:56:06 +0300
commit9b26f8a1cd66768ee09d6a9b7a7913e813b30f6d (patch)
tree970bae605deb7a90a6eb1df21f4e76467e64e9da
parent4c3bb80b2d126d36d69d0f4842170025eb04914a (diff)
downloadydb-9b26f8a1cd66768ee09d6a9b7a7913e813b30f6d.tar.gz
Intermediate changes
commit_hash:6fdf8d3d31a6b3c0ec54292ffcd6919b46d4f2a7
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.cpp42
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.h7
-rw-r--r--yt/yt/core/rpc/unittests/lib/test_service.proto11
-rw-r--r--yt/yt/core/rpc/unittests/rpc_ut.cpp34
4 files changed, 79 insertions, 15 deletions
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp
index f0994b55e3..afd60131da 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.cpp
+++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp
@@ -20,6 +20,10 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
+YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_);
+
+////////////////////////////////////////////////////////////////////////////////
+
class TTestService
: public ITestService
, public TServiceBase
@@ -54,6 +58,10 @@ public:
.SetQueueSizeLimit(20)
.SetConcurrencyByteLimit(10_MB)
.SetQueueByteSizeLimit(20_MB));
+ RegisterMethod(RPC_SERVICE_METHOD_DESC(LatchedCall)
+ .SetCancelable(true)
+ .SetConcurrencyLimit(10)
+ .SetQueueSizeLimit(20));
RegisterMethod(RPC_SERVICE_METHOD_DESC(SlowCanceledCall)
.SetCancelable(true));
RegisterMethod(RPC_SERVICE_METHOD_DESC(RequestBytesThrottledCall));
@@ -166,6 +174,15 @@ public:
context->Reply();
}
+ DECLARE_RPC_SERVICE_METHOD(NTestRpc, LatchedCall)
+ {
+ context->SetRequestInfo();
+ if (request->wait_on_latch()) {
+ Latch_()->Wait();
+ }
+ context->Reply();
+ }
+
DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCanceledCall)
{
try {
@@ -396,4 +413,29 @@ ITestServicePtr CreateTestService(
////////////////////////////////////////////////////////////////////////////////
+void ReleaseLatchedCalls()
+{
+ if (!Latch_()) {
+ return;
+ }
+
+ Latch_()->NotifyAll();
+}
+
+void MaybeInitLatch()
+{
+ if (!Latch_()) {
+ Latch_() = std::make_unique<NThreading::TEvent>();
+ }
+}
+
+void ResetLatch()
+{
+ if (Latch_()) {
+ Latch_().reset();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h
index 3bc224eee8..bfb9ebd149 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.h
+++ b/yt/yt/core/rpc/unittests/lib/test_service.h
@@ -33,6 +33,7 @@ public:
DEFINE_RPC_PROXY_METHOD(NTestRpc, NotRegistered);
DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCall);
DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCanceledCall);
+ DEFINE_RPC_PROXY_METHOD(NTestRpc, LatchedCall);
DEFINE_RPC_PROXY_METHOD(NTestRpc, NoReply);
DEFINE_RPC_PROXY_METHOD(NTestRpc, FlakyCall);
DEFINE_RPC_PROXY_METHOD(NTestRpc, RequireCoolFeature);
@@ -75,4 +76,10 @@ ITestServicePtr CreateTestService(
////////////////////////////////////////////////////////////////////////////////
+void ReleaseLatchedCalls();
+void MaybeInitLatch();
+void ResetLatch();
+
+////////////////////////////////////////////////////////////////////////////////
+
} // namespace NYT::NRpc
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.proto b/yt/yt/core/rpc/unittests/lib/test_service.proto
index 8ef02ef1f5..d126bea68f 100644
--- a/yt/yt/core/rpc/unittests/lib/test_service.proto
+++ b/yt/yt/core/rpc/unittests/lib/test_service.proto
@@ -130,6 +130,17 @@ message TRspSlowCanceledCall
////////////////////////////////////////////////////////////////////////////////
+message TReqLatchedCall
+{
+ optional bool wait_on_latch = 1 [default = true];
+}
+
+message TRspLatchedCall
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
message TReqRequestBytesThrottledCall
{
}
diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp
index 6b67d5f117..146a3cad28 100644
--- a/yt/yt/core/rpc/unittests/rpc_ut.cpp
+++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp
@@ -787,6 +787,8 @@ TYPED_TEST(TRpcTest, SlowCall)
TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
{
+ MaybeInitLatch();
+
std::vector<TFuture<void>> futures;
std::vector<TTestProxy> proxies;
@@ -798,7 +800,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
}
for (int i = 0; i <= 30; ++i) {
- auto req = proxies[i].SlowCall();
+ auto req = proxies[i].LatchedCall();
futures.push_back(req->Invoke().AsVoid());
}
@@ -806,11 +808,14 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit)
{
TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultTimeout(TDuration::Seconds(60.0));
- auto req = proxy.SlowCall();
+ auto req = proxy.LatchedCall();
EXPECT_EQ(NRpc::EErrorCode::RequestQueueSizeLimitExceeded, req->Invoke().Get().GetCode());
}
+ ReleaseLatchedCalls();
EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK());
+
+ ResetLatch();
}
TYPED_TEST(TNotGrpcTest, RequesMemoryPressureException)
@@ -968,19 +973,15 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit)
TYPED_TEST(TRpcTest, ConcurrencyLimit)
{
- auto shared_counter = std::make_shared<std::atomic<int>>(0);
+ MaybeInitLatch();
std::vector<TFuture<void>> futures;
for (int i = 0; i < 10; ++i) {
TTestProxy proxy(this->CreateChannel());
proxy.SetDefaultTimeout(TDuration::Seconds(10.0));
- auto req = proxy.SlowCall();
+ auto req = proxy.LatchedCall();
futures.push_back(
- req->Invoke()
- .AsVoid()
- .Apply(BIND([counter = shared_counter] {
- counter->fetch_add(1);
- })));
+ req->Invoke().AsVoid());
}
Sleep(TDuration::MilliSeconds(200));
@@ -988,18 +989,21 @@ TYPED_TEST(TRpcTest, ConcurrencyLimit)
TFuture<void> backlogFuture;
{
TTestProxy proxy(this->CreateChannel());
- auto req = proxy.SlowCall();
+ auto req = proxy.LatchedCall();
+ req->set_wait_on_latch(false);
backlogFuture =
- req->Invoke()
- .AsVoid()
- .Apply(BIND([counter = shared_counter] {
- EXPECT_EQ(counter->load(), 10);
- }));
+ req->Invoke().AsVoid();
}
+ Sleep(TDuration::Seconds(2));
+ EXPECT_FALSE(backlogFuture.IsSet());
+ ReleaseLatchedCalls();
+
EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK());
EXPECT_TRUE(backlogFuture.Get().IsOK());
+
+ ResetLatch();
}
TYPED_TEST(TRpcTest, NoReply)