diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-18 14:37:21 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-09-18 14:56:06 +0300 |
commit | 9b26f8a1cd66768ee09d6a9b7a7913e813b30f6d (patch) | |
tree | 970bae605deb7a90a6eb1df21f4e76467e64e9da | |
parent | 4c3bb80b2d126d36d69d0f4842170025eb04914a (diff) | |
download | ydb-9b26f8a1cd66768ee09d6a9b7a7913e813b30f6d.tar.gz |
Intermediate changes
commit_hash:6fdf8d3d31a6b3c0ec54292ffcd6919b46d4f2a7
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.cpp | 42 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.h | 7 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/lib/test_service.proto | 11 | ||||
-rw-r--r-- | yt/yt/core/rpc/unittests/rpc_ut.cpp | 34 |
4 files changed, 79 insertions, 15 deletions
diff --git a/yt/yt/core/rpc/unittests/lib/test_service.cpp b/yt/yt/core/rpc/unittests/lib/test_service.cpp index f0994b55e3..afd60131da 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.cpp +++ b/yt/yt/core/rpc/unittests/lib/test_service.cpp @@ -20,6 +20,10 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// +YT_DEFINE_GLOBAL(std::unique_ptr<NThreading::TEvent>, Latch_); + +//////////////////////////////////////////////////////////////////////////////// + class TTestService : public ITestService , public TServiceBase @@ -54,6 +58,10 @@ public: .SetQueueSizeLimit(20) .SetConcurrencyByteLimit(10_MB) .SetQueueByteSizeLimit(20_MB)); + RegisterMethod(RPC_SERVICE_METHOD_DESC(LatchedCall) + .SetCancelable(true) + .SetConcurrencyLimit(10) + .SetQueueSizeLimit(20)); RegisterMethod(RPC_SERVICE_METHOD_DESC(SlowCanceledCall) .SetCancelable(true)); RegisterMethod(RPC_SERVICE_METHOD_DESC(RequestBytesThrottledCall)); @@ -166,6 +174,15 @@ public: context->Reply(); } + DECLARE_RPC_SERVICE_METHOD(NTestRpc, LatchedCall) + { + context->SetRequestInfo(); + if (request->wait_on_latch()) { + Latch_()->Wait(); + } + context->Reply(); + } + DECLARE_RPC_SERVICE_METHOD(NTestRpc, SlowCanceledCall) { try { @@ -396,4 +413,29 @@ ITestServicePtr CreateTestService( //////////////////////////////////////////////////////////////////////////////// +void ReleaseLatchedCalls() +{ + if (!Latch_()) { + return; + } + + Latch_()->NotifyAll(); +} + +void MaybeInitLatch() +{ + if (!Latch_()) { + Latch_() = std::make_unique<NThreading::TEvent>(); + } +} + +void ResetLatch() +{ + if (Latch_()) { + Latch_().reset(); + } +} + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/unittests/lib/test_service.h b/yt/yt/core/rpc/unittests/lib/test_service.h index 3bc224eee8..bfb9ebd149 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.h +++ b/yt/yt/core/rpc/unittests/lib/test_service.h @@ -33,6 +33,7 @@ public: DEFINE_RPC_PROXY_METHOD(NTestRpc, NotRegistered); DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCall); DEFINE_RPC_PROXY_METHOD(NTestRpc, SlowCanceledCall); + DEFINE_RPC_PROXY_METHOD(NTestRpc, LatchedCall); DEFINE_RPC_PROXY_METHOD(NTestRpc, NoReply); DEFINE_RPC_PROXY_METHOD(NTestRpc, FlakyCall); DEFINE_RPC_PROXY_METHOD(NTestRpc, RequireCoolFeature); @@ -75,4 +76,10 @@ ITestServicePtr CreateTestService( //////////////////////////////////////////////////////////////////////////////// +void ReleaseLatchedCalls(); +void MaybeInitLatch(); +void ResetLatch(); + +//////////////////////////////////////////////////////////////////////////////// + } // namespace NYT::NRpc diff --git a/yt/yt/core/rpc/unittests/lib/test_service.proto b/yt/yt/core/rpc/unittests/lib/test_service.proto index 8ef02ef1f5..d126bea68f 100644 --- a/yt/yt/core/rpc/unittests/lib/test_service.proto +++ b/yt/yt/core/rpc/unittests/lib/test_service.proto @@ -130,6 +130,17 @@ message TRspSlowCanceledCall //////////////////////////////////////////////////////////////////////////////// +message TReqLatchedCall +{ + optional bool wait_on_latch = 1 [default = true]; +} + +message TRspLatchedCall +{ +} + +//////////////////////////////////////////////////////////////////////////////// + message TReqRequestBytesThrottledCall { } diff --git a/yt/yt/core/rpc/unittests/rpc_ut.cpp b/yt/yt/core/rpc/unittests/rpc_ut.cpp index 6b67d5f117..146a3cad28 100644 --- a/yt/yt/core/rpc/unittests/rpc_ut.cpp +++ b/yt/yt/core/rpc/unittests/rpc_ut.cpp @@ -787,6 +787,8 @@ TYPED_TEST(TRpcTest, SlowCall) TYPED_TEST(TRpcTest, RequestQueueSizeLimit) { + MaybeInitLatch(); + std::vector<TFuture<void>> futures; std::vector<TTestProxy> proxies; @@ -798,7 +800,7 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) } for (int i = 0; i <= 30; ++i) { - auto req = proxies[i].SlowCall(); + auto req = proxies[i].LatchedCall(); futures.push_back(req->Invoke().AsVoid()); } @@ -806,11 +808,14 @@ TYPED_TEST(TRpcTest, RequestQueueSizeLimit) { TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultTimeout(TDuration::Seconds(60.0)); - auto req = proxy.SlowCall(); + auto req = proxy.LatchedCall(); EXPECT_EQ(NRpc::EErrorCode::RequestQueueSizeLimitExceeded, req->Invoke().Get().GetCode()); } + ReleaseLatchedCalls(); EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK()); + + ResetLatch(); } TYPED_TEST(TNotGrpcTest, RequesMemoryPressureException) @@ -968,19 +973,15 @@ TYPED_TEST(TNotGrpcTest, RequestQueueByteSizeLimit) TYPED_TEST(TRpcTest, ConcurrencyLimit) { - auto shared_counter = std::make_shared<std::atomic<int>>(0); + MaybeInitLatch(); std::vector<TFuture<void>> futures; for (int i = 0; i < 10; ++i) { TTestProxy proxy(this->CreateChannel()); proxy.SetDefaultTimeout(TDuration::Seconds(10.0)); - auto req = proxy.SlowCall(); + auto req = proxy.LatchedCall(); futures.push_back( - req->Invoke() - .AsVoid() - .Apply(BIND([counter = shared_counter] { - counter->fetch_add(1); - }))); + req->Invoke().AsVoid()); } Sleep(TDuration::MilliSeconds(200)); @@ -988,18 +989,21 @@ TYPED_TEST(TRpcTest, ConcurrencyLimit) TFuture<void> backlogFuture; { TTestProxy proxy(this->CreateChannel()); - auto req = proxy.SlowCall(); + auto req = proxy.LatchedCall(); + req->set_wait_on_latch(false); backlogFuture = - req->Invoke() - .AsVoid() - .Apply(BIND([counter = shared_counter] { - EXPECT_EQ(counter->load(), 10); - })); + req->Invoke().AsVoid(); } + Sleep(TDuration::Seconds(2)); + EXPECT_FALSE(backlogFuture.IsSet()); + ReleaseLatchedCalls(); + EXPECT_TRUE(AllSucceeded(std::move(futures)).Get().IsOK()); EXPECT_TRUE(backlogFuture.Get().IsOK()); + + ResetLatch(); } TYPED_TEST(TRpcTest, NoReply) |