diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:52 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:52 +0300 |
commit | c768a99151e47c3a4bb7b92c514d256abd301c4d (patch) | |
tree | 1a2c5ffcf89eb53ecd79dbc9bc0a195c27404d0c /contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc | |
parent | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (diff) | |
download | ydb-c768a99151e47c3a4bb7b92c514d256abd301c4d.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc')
-rw-r--r-- | contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc | 380 |
1 files changed, 190 insertions, 190 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc b/contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc index 37e696adaa..4be070ec71 100644 --- a/contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc +++ b/contrib/libs/grpc/test/cpp/end2end/nonblocking_test.cc @@ -1,203 +1,203 @@ -/* - * - * Copyright 2018 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <memory> - -#include <grpcpp/channel.h> -#include <grpcpp/client_context.h> -#include <grpcpp/create_channel.h> -#include <grpcpp/server.h> -#include <grpcpp/server_builder.h> -#include <grpcpp/server_context.h> - -#include "src/core/lib/gpr/tls.h" -#include "src/core/lib/iomgr/port.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/core/util/port.h" -#include "test/core/util/test_config.h" - -#ifdef GRPC_POSIX_SOCKET -#include "src/core/lib/iomgr/ev_posix.h" -#endif // GRPC_POSIX_SOCKET - -#include <gtest/gtest.h> - -#ifdef GRPC_POSIX_SOCKET -// Thread-local variable to so that only polls from this test assert +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <memory> + +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/server_context.h> + +#include "src/core/lib/gpr/tls.h" +#include "src/core/lib/iomgr/port.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" + +#ifdef GRPC_POSIX_SOCKET +#include "src/core/lib/iomgr/ev_posix.h" +#endif // GRPC_POSIX_SOCKET + +#include <gtest/gtest.h> + +#ifdef GRPC_POSIX_SOCKET +// Thread-local variable to so that only polls from this test assert // non-blocking (not polls from resolver, timer thread, etc), and only when the // thread is waiting on polls caused by CompletionQueue::AsyncNext (not for // picking a port or other reasons). GPR_TLS_DECL(g_is_nonblocking_poll); - -namespace { - -int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, - int timeout) { + +namespace { + +int maybe_assert_non_blocking_poll(struct pollfd* pfds, nfds_t nfds, + int timeout) { // Only assert that this poll should have zero timeout if we're in the // middle of a zero-timeout CQ Next. if (gpr_tls_get(&g_is_nonblocking_poll)) { - GPR_ASSERT(timeout == 0); - } - return poll(pfds, nfds, timeout); -} - -} // namespace - -namespace grpc { -namespace testing { -namespace { - -void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); } -int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } - -class NonblockingTest : public ::testing::Test { - protected: - NonblockingTest() {} - - void SetUp() override { - port_ = grpc_pick_unused_port_or_die(); - server_address_ << "localhost:" << port_; - - // Setup server - BuildAndStartServer(); - } - - bool LoopForTag(void** tag, bool* ok) { + GPR_ASSERT(timeout == 0); + } + return poll(pfds, nfds, timeout); +} + +} // namespace + +namespace grpc { +namespace testing { +namespace { + +void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); } +int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); } + +class NonblockingTest : public ::testing::Test { + protected: + NonblockingTest() {} + + void SetUp() override { + port_ = grpc_pick_unused_port_or_die(); + server_address_ << "localhost:" << port_; + + // Setup server + BuildAndStartServer(); + } + + bool LoopForTag(void** tag, bool* ok) { // Temporarily set the thread-local nonblocking poll flag so that the polls // caused by this loop are indeed sent by the library with zero timeout. intptr_t orig_val = gpr_tls_get(&g_is_nonblocking_poll); gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(true)); - for (;;) { - auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME)); - if (r == CompletionQueue::SHUTDOWN) { + for (;;) { + auto r = cq_->AsyncNext(tag, ok, gpr_time_0(GPR_CLOCK_REALTIME)); + if (r == CompletionQueue::SHUTDOWN) { gpr_tls_set(&g_is_nonblocking_poll, orig_val); - return false; - } else if (r == CompletionQueue::GOT_EVENT) { + return false; + } else if (r == CompletionQueue::GOT_EVENT) { gpr_tls_set(&g_is_nonblocking_poll, orig_val); - return true; - } - } - } - - void TearDown() override { - server_->Shutdown(); - void* ignored_tag; - bool ignored_ok; - cq_->Shutdown(); - while (LoopForTag(&ignored_tag, &ignored_ok)) - ; - stub_.reset(); - grpc_recycle_unused_port(port_); - } - - void BuildAndStartServer() { - ServerBuilder builder; - builder.AddListeningPort(server_address_.str(), - grpc::InsecureServerCredentials()); - service_.reset(new grpc::testing::EchoTestService::AsyncService()); - builder.RegisterService(service_.get()); - cq_ = builder.AddCompletionQueue(); - server_ = builder.BuildAndStart(); - } - - void ResetStub() { - std::shared_ptr<Channel> channel = grpc::CreateChannel( - server_address_.str(), grpc::InsecureChannelCredentials()); - stub_ = grpc::testing::EchoTestService::NewStub(channel); - } - - void SendRpc(int num_rpcs) { - for (int i = 0; i < num_rpcs; i++) { - EchoRequest send_request; - EchoRequest recv_request; - EchoResponse send_response; - EchoResponse recv_response; - Status recv_status; - - ClientContext cli_ctx; - ServerContext srv_ctx; - grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); - - send_request.set_message("hello non-blocking world"); - std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( - stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get())); - - response_reader->StartCall(); - response_reader->Finish(&recv_response, &recv_status, tag(4)); - - service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, - cq_.get(), cq_.get(), tag(2)); - - void* got_tag; - bool ok; - EXPECT_TRUE(LoopForTag(&got_tag, &ok)); - EXPECT_TRUE(ok); - EXPECT_EQ(detag(got_tag), 2); - EXPECT_EQ(send_request.message(), recv_request.message()); - - send_response.set_message(recv_request.message()); - response_writer.Finish(send_response, Status::OK, tag(3)); - - int tagsum = 0; - int tagprod = 1; - EXPECT_TRUE(LoopForTag(&got_tag, &ok)); - EXPECT_TRUE(ok); - tagsum += detag(got_tag); - tagprod *= detag(got_tag); - - EXPECT_TRUE(LoopForTag(&got_tag, &ok)); - EXPECT_TRUE(ok); - tagsum += detag(got_tag); - tagprod *= detag(got_tag); - - EXPECT_EQ(tagsum, 7); - EXPECT_EQ(tagprod, 12); - EXPECT_EQ(send_response.message(), recv_response.message()); - EXPECT_TRUE(recv_status.ok()); - } - } - - std::unique_ptr<ServerCompletionQueue> cq_; - std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::unique_ptr<Server> server_; - std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_; - std::ostringstream server_address_; - int port_; -}; - -TEST_F(NonblockingTest, SimpleRpc) { - ResetStub(); - SendRpc(10); -} - -} // namespace -} // namespace testing -} // namespace grpc - -#endif // GRPC_POSIX_SOCKET - -int main(int argc, char** argv) { -#ifdef GRPC_POSIX_SOCKET - // Override the poll function before anything else can happen - grpc_poll_function = maybe_assert_non_blocking_poll; - - grpc::testing::TestEnvironment env(argc, argv); - ::testing::InitGoogleTest(&argc, argv); + return true; + } + } + } + + void TearDown() override { + server_->Shutdown(); + void* ignored_tag; + bool ignored_ok; + cq_->Shutdown(); + while (LoopForTag(&ignored_tag, &ignored_ok)) + ; + stub_.reset(); + grpc_recycle_unused_port(port_); + } + + void BuildAndStartServer() { + ServerBuilder builder; + builder.AddListeningPort(server_address_.str(), + grpc::InsecureServerCredentials()); + service_.reset(new grpc::testing::EchoTestService::AsyncService()); + builder.RegisterService(service_.get()); + cq_ = builder.AddCompletionQueue(); + server_ = builder.BuildAndStart(); + } + + void ResetStub() { + std::shared_ptr<Channel> channel = grpc::CreateChannel( + server_address_.str(), grpc::InsecureChannelCredentials()); + stub_ = grpc::testing::EchoTestService::NewStub(channel); + } + + void SendRpc(int num_rpcs) { + for (int i = 0; i < num_rpcs; i++) { + EchoRequest send_request; + EchoRequest recv_request; + EchoResponse send_response; + EchoResponse recv_response; + Status recv_status; + + ClientContext cli_ctx; + ServerContext srv_ctx; + grpc::ServerAsyncResponseWriter<EchoResponse> response_writer(&srv_ctx); + + send_request.set_message("hello non-blocking world"); + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader( + stub_->PrepareAsyncEcho(&cli_ctx, send_request, cq_.get())); + + response_reader->StartCall(); + response_reader->Finish(&recv_response, &recv_status, tag(4)); + + service_->RequestEcho(&srv_ctx, &recv_request, &response_writer, + cq_.get(), cq_.get(), tag(2)); + + void* got_tag; + bool ok; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + EXPECT_EQ(detag(got_tag), 2); + EXPECT_EQ(send_request.message(), recv_request.message()); + + send_response.set_message(recv_request.message()); + response_writer.Finish(send_response, Status::OK, tag(3)); + + int tagsum = 0; + int tagprod = 1; + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_TRUE(LoopForTag(&got_tag, &ok)); + EXPECT_TRUE(ok); + tagsum += detag(got_tag); + tagprod *= detag(got_tag); + + EXPECT_EQ(tagsum, 7); + EXPECT_EQ(tagprod, 12); + EXPECT_EQ(send_response.message(), recv_response.message()); + EXPECT_TRUE(recv_status.ok()); + } + } + + std::unique_ptr<ServerCompletionQueue> cq_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::unique_ptr<grpc::testing::EchoTestService::AsyncService> service_; + std::ostringstream server_address_; + int port_; +}; + +TEST_F(NonblockingTest, SimpleRpc) { + ResetStub(); + SendRpc(10); +} + +} // namespace +} // namespace testing +} // namespace grpc + +#endif // GRPC_POSIX_SOCKET + +int main(int argc, char** argv) { +#ifdef GRPC_POSIX_SOCKET + // Override the poll function before anything else can happen + grpc_poll_function = maybe_assert_non_blocking_poll; + + grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); gpr_tls_init(&g_is_nonblocking_poll); // Start the nonblocking poll thread-local variable as false because the @@ -205,10 +205,10 @@ int main(int argc, char** argv) { // timeout). gpr_tls_set(&g_is_nonblocking_poll, static_cast<intptr_t>(false)); - int ret = RUN_ALL_TESTS(); + int ret = RUN_ALL_TESTS(); gpr_tls_destroy(&g_is_nonblocking_poll); - return ret; + return ret; #else // GRPC_POSIX_SOCKET return 0; #endif // GRPC_POSIX_SOCKET -} +} |