aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/grpc/server/ut
diff options
context:
space:
mode:
authorDevtools Arcadia <arcadia-devtools@yandex-team.ru>2022-02-07 18:08:42 +0300
committerDevtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net>2022-02-07 18:08:42 +0300
commit1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch)
treee26c9fed0de5d9873cce7e00bc214573dc2195b7 /library/cpp/grpc/server/ut
downloadydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'library/cpp/grpc/server/ut')
-rw-r--r--library/cpp/grpc/server/ut/grpc_response_ut.cpp88
-rw-r--r--library/cpp/grpc/server/ut/stream_adaptor_ut.cpp121
-rw-r--r--library/cpp/grpc/server/ut/ya.make21
3 files changed, 230 insertions, 0 deletions
diff --git a/library/cpp/grpc/server/ut/grpc_response_ut.cpp b/library/cpp/grpc/server/ut/grpc_response_ut.cpp
new file mode 100644
index 00000000000..8abc4e4e0ec
--- /dev/null
+++ b/library/cpp/grpc/server/ut/grpc_response_ut.cpp
@@ -0,0 +1,88 @@
+#include <library/cpp/grpc/server/grpc_response.h>
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <google/protobuf/duration.pb.h>
+#include <grpc++/impl/codegen/proto_utils.h>
+#include <grpc++/impl/grpc_library.h>
+
+static ::grpc::internal::GrpcLibraryInitializer grpcInitializer;
+
+using namespace NGrpc;
+
+using google::protobuf::Duration;
+
+Y_UNIT_TEST_SUITE(ResponseTest) {
+
+ template <typename T>
+ grpc::ByteBuffer Serialize(T resp) {
+ grpc::ByteBuffer buf;
+ bool ownBuf = false;
+ grpc::Status status = grpc::SerializationTraits<T>::Serialize(resp, &buf, &ownBuf);
+ UNIT_ASSERT(status.ok());
+ return buf;
+ }
+
+ template <typename T>
+ T Deserialize(grpc::ByteBuffer* buf) {
+ T message;
+ auto status = grpc::SerializationTraits<T>::Deserialize(buf, &message);
+ UNIT_ASSERT(status.ok());
+ return message;
+ }
+
+ Y_UNIT_TEST(UniversalResponseMsg) {
+ Duration d1;
+ d1.set_seconds(12345);
+ d1.set_nanos(67890);
+
+ auto buf = Serialize(TUniversalResponse<Duration>(&d1));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
+ }
+
+ Y_UNIT_TEST(UniversalResponseBuf) {
+ Duration d1;
+ d1.set_seconds(123);
+ d1.set_nanos(456);
+
+ TString data = d1.SerializeAsString();
+ grpc::Slice dataSlice{data.data(), data.size()};
+ grpc::ByteBuffer dataBuf{&dataSlice, 1};
+
+ auto buf = Serialize(TUniversalResponse<Duration>(&dataBuf));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
+ }
+
+ Y_UNIT_TEST(UniversalResponseRefMsg) {
+ Duration d1;
+ d1.set_seconds(12345);
+ d1.set_nanos(67890);
+
+ auto buf = Serialize(TUniversalResponseRef<Duration>(&d1));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 12345);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 67890);
+ }
+
+ Y_UNIT_TEST(UniversalResponseRefBuf) {
+ Duration d1;
+ d1.set_seconds(123);
+ d1.set_nanos(456);
+
+ TString data = d1.SerializeAsString();
+ grpc::Slice dataSlice{data.data(), data.size()};
+ grpc::ByteBuffer dataBuf{&dataSlice, 1};
+
+ auto buf = Serialize(TUniversalResponseRef<Duration>(&dataBuf));
+ Duration d2 = Deserialize<Duration>(&buf);
+
+ UNIT_ASSERT_VALUES_EQUAL(d2.seconds(), 123);
+ UNIT_ASSERT_VALUES_EQUAL(d2.nanos(), 456);
+ }
+}
diff --git a/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
new file mode 100644
index 00000000000..c34d3b8c2bf
--- /dev/null
+++ b/library/cpp/grpc/server/ut/stream_adaptor_ut.cpp
@@ -0,0 +1,121 @@
+#include <library/cpp/grpc/server/grpc_request.h>
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/testing/unittest/tests_data.h>
+
+#include <util/system/thread.h>
+#include <util/thread/pool.h>
+
+using namespace NGrpc;
+
+// Here we emulate stream data producer
+class TOrderedProducer: public TThread {
+public:
+ TOrderedProducer(IStreamAdaptor* adaptor, ui64 max, bool withSleep, std::function<void(ui64)>&& consumerOp)
+ : TThread(&ThreadProc, this)
+ , Adaptor_(adaptor)
+ , Max_(max)
+ , WithSleep_(withSleep)
+ , ConsumerOp_(std::move(consumerOp))
+ {}
+
+ static void* ThreadProc(void* _this) {
+ SetCurrentThreadName("OrderedProducerThread");
+ static_cast<TOrderedProducer*>(_this)->Exec();
+ return nullptr;
+ }
+
+ void Exec() {
+ for (ui64 i = 0; i < Max_; i++) {
+ auto cb = [i, this]() mutable {
+ ConsumerOp_(i);
+ };
+ Adaptor_->Enqueue(std::move(cb), false);
+ if (WithSleep_ && (i % 256 == 0)) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ }
+ }
+
+private:
+ IStreamAdaptor* Adaptor_;
+ const ui64 Max_;
+ const bool WithSleep_;
+ std::function<void(ui64)> ConsumerOp_;
+};
+
+Y_UNIT_TEST_SUITE(StreamAdaptor) {
+ static void OrderingTest(size_t threads, bool withSleep) {
+
+ auto adaptor = CreateStreamAdaptor();
+
+ const i64 max = 10000;
+
+ // Here we will emulate grpc stream (NextReply call after writing)
+ std::unique_ptr<IThreadPool> consumerQueue(new TThreadPool(TThreadPool::TParams().SetBlocking(false).SetCatching(false)));
+ // And make sure only one request inflight (see UNIT_ASSERT on adding to the queue)
+ consumerQueue->Start(threads, 1);
+
+ // Non atomic!!! Stream adaptor must protect us
+ ui64 curVal = 0;
+
+ // Used just to wait in the main thread
+ TAtomic finished = false;
+ auto consumerOp = [&finished, &curVal, ptr{adaptor.get()}, queue{consumerQueue.get()}](ui64 i) {
+ // Check no reordering inside stream adaptor
+ // and no simultanious consumer Op call
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i);
+ curVal++;
+ // We must set finished flag after last ProcessNext, but we can`t compare curVal and max after ProcessNext
+ // so compare here and set after
+ bool tmp = curVal == max;
+ bool res = queue->AddFunc([ptr, &finished, tmp, &curVal, i]() {
+ // Additional check the value still same
+ // run under tsan makes sure no consumer Op call before we call ProcessNext
+ UNIT_ASSERT_VALUES_EQUAL(curVal, i + 1);
+ ptr->ProcessNext();
+ // Reordering after ProcessNext is possible, so check tmp and set finished to true
+ if (tmp)
+ AtomicSet(finished, true);
+ });
+ UNIT_ASSERT(res);
+ };
+
+ TOrderedProducer producer(adaptor.get(), max, withSleep, std::move(consumerOp));
+
+ producer.Start();
+ producer.Join();
+
+ while (!AtomicGet(finished))
+ {
+ Sleep(TDuration::MilliSeconds(100));
+ }
+
+ consumerQueue->Stop();
+
+ UNIT_ASSERT_VALUES_EQUAL(curVal, max);
+ }
+
+ Y_UNIT_TEST(OrderingOneThread) {
+ OrderingTest(1, false);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreads) {
+ OrderingTest(2, false);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreads) {
+ OrderingTest(10, false);
+ }
+
+ Y_UNIT_TEST(OrderingOneThreadWithSleep) {
+ OrderingTest(1, true);
+ }
+
+ Y_UNIT_TEST(OrderingTwoThreadsWithSleep) {
+ OrderingTest(2, true);
+ }
+
+ Y_UNIT_TEST(OrderingManyThreadsWithSleep) {
+ OrderingTest(10, true);
+ }
+}
diff --git a/library/cpp/grpc/server/ut/ya.make b/library/cpp/grpc/server/ut/ya.make
new file mode 100644
index 00000000000..feb3291af92
--- /dev/null
+++ b/library/cpp/grpc/server/ut/ya.make
@@ -0,0 +1,21 @@
+UNITTEST_FOR(library/cpp/grpc/server)
+
+OWNER(
+ dcherednik
+ g:kikimr
+)
+
+TIMEOUT(600)
+SIZE(MEDIUM)
+
+PEERDIR(
+ library/cpp/grpc/server
+)
+
+SRCS(
+ grpc_response_ut.cpp
+ stream_adaptor_ut.cpp
+)
+
+END()
+