diff options
author | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:50:17 +0300 |
commit | 4b11037e5a7d071c63e3c966199fe7102e6462e4 (patch) | |
tree | 5d5cb817648f650d76cf1076100726fd9b8448e8 /library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | |
parent | 17e20fa084178ddcb16255f974dbde74fb93608b (diff) | |
download | ydb-4b11037e5a7d071c63e3c966199fe7102e6462e4.tar.gz |
Restoring authorship annotation for Daniil Cherednik <dcherednik@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'library/cpp/grpc/server/ut/stream_adaptor_ut.cpp')
-rw-r--r-- | library/cpp/grpc/server/ut/stream_adaptor_ut.cpp | 230 |
1 files changed, 115 insertions, 115 deletions
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp index 0840c77176..c34d3b8c2b 100644 --- a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp +++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp @@ -1,121 +1,121 @@ #include <library/cpp/grpc/server/grpc_request.h> #include <library/cpp/testing/unittest/registar.h> #include <library/cpp/testing/unittest/tests_data.h> - -#include <util/system/thread.h> -#include <util/thread/pool.h> - + +#include <util/system/thread.h> +#include <util/thread/pool.h> + using namespace NGrpc; - -// Here we emulate stream data producer + +// Here we emulate stream data producer class TOrderedProducer: public TThread { -public: - TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) - : TThread(&ThreadProc, this) - , Adaptor_(adaptor) - , Max_(max) - , WithSleep_(withSleep) - , ConsumerOp_(std::move(consumerOp)) - {} - - static void* ThreadProc(void* _this) { - SetCurrentThreadName("OrderedProducerThread"); - static_cast<TOrderedProducer*>(_this)->Exec(); - return nullptr; - } - - void Exec() { - for (ui64 i = 0; i < Max_; i++) { - auto cb = [i, this]() mutable { - ConsumerOp_(i); - }; - Adaptor_->Enqueue(std::move(cb), false); - if (WithSleep_ && (i % 256 == 0)) { - Sleep(TDuration::MilliSeconds(10)); - } - } - } - -private: - IStreamAdaptor* Adaptor_; - const ui64 Max_; - const bool WithSleep_; - std::function<void(ui64)> ConsumerOp_; -}; - -Y_UNIT_TEST_SUITE(StreamAdaptor) { - static void OrderingTest(size_t threads, bool withSleep) { - - auto adaptor = CreateStreamAdaptor(); - - const i64 max = 10000; - - // Here we will emulate grpc stream (NextReply call after writing) +public: + TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp) + : TThread(&ThreadProc, this) + , Adaptor_(adaptor) + , Max_(max) + , WithSleep_(withSleep) + , ConsumerOp_(std::move(consumerOp)) + {} + + static void* ThreadProc(void* _this) { + SetCurrentThreadName("OrderedProducerThread"); + static_cast<TOrderedProducer*>(_this)->Exec(); + return nullptr; + } + + void Exec() { + for (ui64 i = 0; i < Max_; i++) { + auto cb = [i, this]() mutable { + ConsumerOp_(i); + }; + Adaptor_->Enqueue(std::move(cb), false); + if (WithSleep_ && (i % 256 == 0)) { + Sleep(TDuration::MilliSeconds(10)); + } + } + } + +private: + IStreamAdaptor* Adaptor_; + const ui64 Max_; + const bool WithSleep_; + std::function<void(ui64)> ConsumerOp_; +}; + +Y_UNIT_TEST_SUITE(StreamAdaptor) { + static void OrderingTest(size_t threads, bool withSleep) { + + auto adaptor = CreateStreamAdaptor(); + + const i64 max = 10000; + + // Here we will emulate grpc stream (NextReply call after writing) std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false))); - // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) - consumerQueue->Start(threads, 1); - - // Non atomic!!! Stream adaptor must protect us - ui64 curVal = 0; - - // Used just to wait in the main thread - TAtomic finished = false; - auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { - // Check no reordering inside stream adaptor - // and no simultanious consumer Op call - UNIT_ASSERT_VALUES_EQUAL(curVal, i); - curVal++; - // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext - // so compare here and set after - bool tmp = curVal == max; - bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { - // Additional check the value still same - // run under tsan makes sure no consumer Op call before we call ProcessNext - UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); - ptr->ProcessNext(); - // Reordering after ProcessNext is possible, so check tmp and set finished to true - if (tmp) - AtomicSet(finished, true); - }); - UNIT_ASSERT(res); - }; - - TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); - - producer.Start(); - producer.Join(); - - while (!AtomicGet(finished)) - { - Sleep(TDuration::MilliSeconds(100)); - } - - consumerQueue->Stop(); - - UNIT_ASSERT_VALUES_EQUAL(curVal, max); - } - - Y_UNIT_TEST(OrderingOneThread) { - OrderingTest(1, false); - } - - Y_UNIT_TEST(OrderingTwoThreads) { - OrderingTest(2, false); - } - - Y_UNIT_TEST(OrderingManyThreads) { - OrderingTest(10, false); - } - - Y_UNIT_TEST(OrderingOneThreadWithSleep) { - OrderingTest(1, true); - } - - Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { - OrderingTest(2, true); - } - - Y_UNIT_TEST(OrderingManyThreadsWithSleep) { - OrderingTest(10, true); - } -} + // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue) + consumerQueue->Start(threads, 1); + + // Non atomic!!! Stream adaptor must protect us + ui64 curVal = 0; + + // Used just to wait in the main thread + TAtomic finished = false; + auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) { + // Check no reordering inside stream adaptor + // and no simultanious consumer Op call + UNIT_ASSERT_VALUES_EQUAL(curVal, i); + curVal++; + // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext + // so compare here and set after + bool tmp = curVal == max; + bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() { + // Additional check the value still same + // run under tsan makes sure no consumer Op call before we call ProcessNext + UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1); + ptr->ProcessNext(); + // Reordering after ProcessNext is possible, so check tmp and set finished to true + if (tmp) + AtomicSet(finished, true); + }); + UNIT_ASSERT(res); + }; + + TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp)); + + producer.Start(); + producer.Join(); + + while (!AtomicGet(finished)) + { + Sleep(TDuration::MilliSeconds(100)); + } + + consumerQueue->Stop(); + + UNIT_ASSERT_VALUES_EQUAL(curVal, max); + } + + Y_UNIT_TEST(OrderingOneThread) { + OrderingTest(1, false); + } + + Y_UNIT_TEST(OrderingTwoThreads) { + OrderingTest(2, false); + } + + Y_UNIT_TEST(OrderingManyThreads) { + OrderingTest(10, false); + } + + Y_UNIT_TEST(OrderingOneThreadWithSleep) { + OrderingTest(1, true); + } + + Y_UNIT_TEST(OrderingTwoThreadsWithSleep) { + OrderingTest(2, true); + } + + Y_UNIT_TEST(OrderingManyThreadsWithSleep) { + OrderingTest(10, true); + } +} |