aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:50:17 +0300
commit4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch)
tree5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
parent17e20fa084178ddcb16255f974dbde74fb93608b (diff)
downloadydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/ut/stream_adaptor_ut.cpp')
-rw-r--r--library/cpp/grpc/server/ut/stream_adaptor_ut.cpp230
1 files changed, 115 insertions, 115 deletions
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
index 0840c77176..c34d3b8c2b 100644
--- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
+++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
@@ -1,121 +1,121 @@
#include <library/cpp/grpc/server/grpc_request.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/testing/unittest/tests_data.h>
-
-#include <util/system/thread.h>
-#include <util/thread/pool.h>
-
+
+#include <util/system/thread.h>
+#include <util/thread/pool.h>
+
using namespace NGrpc;
-
-// Here we emulate stream data producer
+
+// Here we emulate stream data producer
class TOrderedProducer: public TThread {
-public:
- TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
- : TThread(&ThreadProc, this)
- , Adaptor_(adaptor)
- , Max_(max)
- , WithSleep_(withSleep)
- , ConsumerOp_(std::move(consumerOp))
- {}
-
- static void* ThreadProc(void* _this) {
- SetCurrentThreadName("OrderedProducerThread");
- static_cast<TOrderedProducer*>(_this)->Exec();
- return nullptr;
- }
-
- void Exec() {
- for (ui64 i = 0; i < Max_; i++) {
- auto cb = [i, this]() mutable {
- ConsumerOp_(i);
- };
- Adaptor_->Enqueue(std::move(cb), false);
- if (WithSleep_ && (i % 256 == 0)) {
- Sleep(TDuration::MilliSeconds(10));
- }
- }
- }
-
-private:
- IStreamAdaptor* Adaptor_;
- const ui64 Max_;
- const bool WithSleep_;
- std::function<void(ui64)> ConsumerOp_;
-};
-
-Y_UNIT_TEST_SUITE(StreamAdaptor) {
- static void OrderingTest(size_t threads, bool withSleep) {
-
- auto adaptor = CreateStreamAdaptor();
-
- const i64 max = 10000;
-
- // Here we will emulate grpc stream (NextReply call after writing)
+public:
+ TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
+ : TThread(&ThreadProc, this)
+ , Adaptor_(adaptor)
+ , Max_(max)
+ , WithSleep_(withSleep)
+ , ConsumerOp_(std::move(consumerOp))
+ {}
+
+ static void* ThreadProc(void* _this) {
+ SetCurrentThreadName("OrderedProducerThread");
+ static_cast<TOrderedProducer*>(_this)->Exec();
+ return nullptr;
+ }
+
+ void Exec() {
+ for (ui64 i = 0; i < Max_; i++) {
+ auto cb = [i, this]() mutable {
+ ConsumerOp_(i);
+ };
+ Adaptor_->Enqueue(std::move(cb), false);
+ if (WithSleep_ && (i % 256 == 0)) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ }
+ }
+
+private:
+ IStreamAdaptor* Adaptor_;
+ const ui64 Max_;
+ const bool WithSleep_;
+ std::function<void(ui64)> ConsumerOp_;
+};
+
+Y_UNIT_TEST_SUITE(StreamAdaptor) {
+ static void OrderingTest(size_t threads, bool withSleep) {
+
+ auto adaptor = CreateStreamAdaptor();
+
+ const i64 max = 10000;
+
+ // Here we will emulate grpc stream (NextReply call after writing)
std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false)));
- // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
- consumerQueue->Start(threads, 1);
-
- // Non atomic!!! Stream adaptor must protect us
- ui64 curVal = 0;
-
- // Used just to wait in the main thread
- TAtomic finished = false;
- auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
- // Check no reordering inside stream adaptor
- // and no simultanious consumer Op call
- UNIT_ASSERT_VALUES_EQUAL(curVal, i);
- curVal++;
- // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
- // so compare here and set after
- bool tmp = curVal == max;
- bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
- // Additional check the value still same
- // run under tsan makes sure no consumer Op call before we call ProcessNext
- UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
- ptr->ProcessNext();
- // Reordering after ProcessNext is possible, so check tmp and set finished to true
- if (tmp)
- AtomicSet(finished, true);
- });
- UNIT_ASSERT(res);
- };
-
- TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
-
- producer.Start();
- producer.Join();
-
- while (!AtomicGet(finished))
- {
- Sleep(TDuration::MilliSeconds(100));
- }
-
- consumerQueue->Stop();
-
- UNIT_ASSERT_VALUES_EQUAL(curVal, max);
- }
-
- Y_UNIT_TEST(OrderingOneThread) {
- OrderingTest(1, false);
- }
-
- Y_UNIT_TEST(OrderingTwoThreads) {
- OrderingTest(2, false);
- }
-
- Y_UNIT_TEST(OrderingManyThreads) {
- OrderingTest(10, false);
- }
-
- Y_UNIT_TEST(OrderingOneThreadWithSleep) {
- OrderingTest(1, true);
- }
-
- Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
- OrderingTest(2, true);
- }
-
- Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
- OrderingTest(10, true);
- }
-}
+ // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
+ consumerQueue->Start(threads, 1);
+
+ // Non atomic!!! Stream adaptor must protect us
+ ui64 curVal = 0;
+
+ // Used just to wait in the main thread
+ TAtomic finished = false;
+ auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
+ // Check no reordering inside stream adaptor
+ // and no simultanious consumer Op call
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i);
+ curVal++;
+ // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
+ // so compare here and set after
+ bool tmp = curVal == max;
+ bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
+ // Additional check the value still same
+ // run under tsan makes sure no consumer Op call before we call ProcessNext
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
+ ptr->ProcessNext();
+ // Reordering after ProcessNext is possible, so check tmp and set finished to true
+ if (tmp)
+ AtomicSet(finished, true);
+ });
+ UNIT_ASSERT(res);
+ };
+
+ TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
+
+ producer.Start();
+ producer.Join();
+
+ while (!AtomicGet(finished))
+ {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ consumerQueue->Stop();
+
+ UNIT_ASSERT_VALUES_EQUAL(curVal, max);
+ }
+
+ Y_UNIT_TEST(OrderingOneThread) {
+ OrderingTest(1, false);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreads) {
+ OrderingTest(2, false);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreads) {
+ OrderingTest(10, false);
+ }
+
+ Y_UNIT_TEST(OrderingOneThreadWithSleep) {
+ OrderingTest(1, true);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
+ OrderingTest(2, true);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
+ OrderingTest(10, true);
+ }
+}