diff options
author | heretic <heretic@yandex-team.ru> | 2022-02-10 16:45:46 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:46 +0300 |
commit | 81eddc8c0b55990194e112b02d127b87d54164a9 (patch) | |
tree | 9142afc54d335ea52910662635b898e79e192e49 /contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc | |
parent | 397cbe258b9e064f49c4ca575279f02f39fef76e (diff) | |
download | ydb-81eddc8c0b55990194e112b02d127b87d54164a9.tar.gz |
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc')
-rw-r--r-- | contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc | 586 |
1 files changed, 293 insertions, 293 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc b/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc index 467f482d3f..12cb40a953 100644 --- a/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc +++ b/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc @@ -25,15 +25,15 @@ #include <grpcpp/server_builder.h> #include <grpcpp/server_context.h> #include <grpcpp/support/client_callback.h> -#include <gtest/gtest.h> - -#include <algorithm> -#include <condition_variable> -#include <functional> -#include <mutex> -#include <sstream> -#include <thread> - +#include <gtest/gtest.h> + +#include <algorithm> +#include <condition_variable> +#include <functional> +#include <mutex> +#include <sstream> +#include <thread> + #include "src/core/lib/gpr/env.h" #include "src/core/lib/iomgr/iomgr.h" #include "src/proto/grpc/testing/echo.grpc.pb.h" @@ -65,7 +65,7 @@ enum class Protocol { INPROC, TCP }; class TestScenario { public: TestScenario(bool serve_callback, Protocol protocol, bool intercept, - const TString& creds_type) + const TString& creds_type) : callback_server(serve_callback), protocol(protocol), use_interceptors(intercept), @@ -74,7 +74,7 @@ class TestScenario { bool callback_server; Protocol protocol; bool use_interceptors; - const TString credentials_type; + const TString credentials_type; }; static std::ostream& operator<<(std::ostream& out, @@ -180,7 +180,7 @@ class ClientCallbackEnd2endTest } void SendRpcs(int num_rpcs, bool with_binary_metadata) { - TString test_string(""); + TString test_string(""); for (int i = 0; i < num_rpcs; i++) { EchoRequest request; EchoResponse response; @@ -188,12 +188,12 @@ class ClientCallbackEnd2endTest test_string += "Hello world. "; request.set_message(test_string); - TString val; + TString val; if (with_binary_metadata) { request.mutable_param()->set_echo_metadata(true); char bytes[8] = {'\0', '\1', '\2', '\3', '\4', '\5', '\6', static_cast<char>(i)}; - val = TString(bytes, 8); + val = TString(bytes, 8); cli_ctx.AddMetadata("custom-bin", val); } @@ -228,8 +228,8 @@ class ClientCallbackEnd2endTest } void SendRpcsGeneric(int num_rpcs, bool maybe_except) { - const TString kMethodName("/grpc.testing.EchoTestService/Echo"); - TString test_string(""); + const TString kMethodName("/grpc.testing.EchoTestService/Echo"); + TString test_string(""); for (int i = 0; i < num_rpcs; i++) { EchoRequest request; std::unique_ptr<ByteBuffer> send_buf; @@ -269,17 +269,17 @@ class ClientCallbackEnd2endTest } } - void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) { - const TString kMethodName("/grpc.testing.EchoTestService/Echo"); - TString test_string(""); + void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) { + const TString kMethodName("/grpc.testing.EchoTestService/Echo"); + TString test_string(""); for (int i = 0; i < num_rpcs; i++) { test_string += "Hello world. "; class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer, ByteBuffer> { public: - Client(ClientCallbackEnd2endTest* test, const TString& method_name, - const TString& test_str, int reuses, bool do_writes_done) - : reuses_remaining_(reuses), do_writes_done_(do_writes_done) { + Client(ClientCallbackEnd2endTest* test, const TString& method_name, + const TString& test_str, int reuses, bool do_writes_done) + : reuses_remaining_(reuses), do_writes_done_(do_writes_done) { activate_ = [this, test, method_name, test_str] { if (reuses_remaining_ > 0) { cli_ctx_.reset(new ClientContext); @@ -299,11 +299,11 @@ class ClientCallbackEnd2endTest }; activate_(); } - void OnWriteDone(bool /*ok*/) override { - if (do_writes_done_) { - StartWritesDone(); - } - } + void OnWriteDone(bool /*ok*/) override { + if (do_writes_done_) { + StartWritesDone(); + } + } void OnReadDone(bool /*ok*/) override { EchoResponse response; EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); @@ -329,11 +329,11 @@ class ClientCallbackEnd2endTest std::mutex mu_; std::condition_variable cv_; bool done_ = false; - const bool do_writes_done_; - }; + const bool do_writes_done_; + }; + + Client rpc(this, kMethodName, test_string, reuses, do_writes_done); - Client rpc(this, kMethodName, test_string, reuses, do_writes_done); - rpc.Await(); } } @@ -355,102 +355,102 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) { SendRpcs(1, false); } -TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) { +TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) { MAYBE_SKIP_TEST; ResetStub(); - - EchoRequest request; - EchoResponse response; - ClientContext cli_ctx; - ErrorStatus error_status; - - request.set_message("Hello failure"); - error_status.set_code(1); // CANCELLED - error_status.set_error_message("cancel error message"); - *request.mutable_param()->mutable_expected_error() = error_status; - - std::mutex mu; + + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + ErrorStatus error_status; + + request.set_message("Hello failure"); + error_status.set_code(1); // CANCELLED + error_status.set_error_message("cancel error message"); + *request.mutable_param()->mutable_expected_error() = error_status; + + std::mutex mu; std::condition_variable cv; bool done = false; - stub_->experimental_async()->Echo( - &cli_ctx, &request, &response, - [&response, &done, &mu, &cv, &error_status](Status s) { - EXPECT_EQ("", response.message()); - EXPECT_EQ(error_status.code(), s.error_code()); - EXPECT_EQ(error_status.error_message(), s.error_message()); - std::lock_guard<std::mutex> l(mu); - done = true; - cv.notify_one(); - }); - - std::unique_lock<std::mutex> l(mu); + stub_->experimental_async()->Echo( + &cli_ctx, &request, &response, + [&response, &done, &mu, &cv, &error_status](Status s) { + EXPECT_EQ("", response.message()); + EXPECT_EQ(error_status.code(), s.error_code()); + EXPECT_EQ(error_status.error_message(), s.error_message()); + std::lock_guard<std::mutex> l(mu); + done = true; + cv.notify_one(); + }); + + std::unique_lock<std::mutex> l(mu); while (!done) { cv.wait(l); } } -TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) { - MAYBE_SKIP_TEST; - ResetStub(); - - // The request/response state associated with an RPC and the synchronization - // variables needed to notify its completion. - struct RpcState { - std::mutex mu; - std::condition_variable cv; - bool done = false; - EchoRequest request; - EchoResponse response; - ClientContext cli_ctx; - - RpcState() = default; - ~RpcState() { - // Grab the lock to prevent destruction while another is still holding - // lock - std::lock_guard<std::mutex> lock(mu); - } - }; - std::vector<RpcState> rpc_state(3); - for (size_t i = 0; i < rpc_state.size(); i++) { - TString message = "Hello locked world"; - message += ToString(i); - rpc_state[i].request.set_message(message); - } - - // Grab a lock and then start an RPC whose callback grabs the same lock and - // then calls this function to start the next RPC under lock (up to a limit of - // the size of the rpc_state vector). - std::function<void(int)> nested_call = [this, &nested_call, - &rpc_state](int index) { - std::lock_guard<std::mutex> l(rpc_state[index].mu); - stub_->experimental_async()->Echo( - &rpc_state[index].cli_ctx, &rpc_state[index].request, - &rpc_state[index].response, - [index, &nested_call, &rpc_state](Status s) { - std::lock_guard<std::mutex> l1(rpc_state[index].mu); - EXPECT_TRUE(s.ok()); - rpc_state[index].done = true; - rpc_state[index].cv.notify_all(); - // Call the next level of nesting if possible - if (index + 1 < rpc_state.size()) { - nested_call(index + 1); - } - }); - }; - - nested_call(0); - - // Wait for completion notifications from all RPCs. Order doesn't matter. - for (RpcState& state : rpc_state) { - std::unique_lock<std::mutex> l(state.mu); - while (!state.done) { - state.cv.wait(l); - } - EXPECT_EQ(state.request.message(), state.response.message()); - } -} - +TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) { + MAYBE_SKIP_TEST; + ResetStub(); + + // The request/response state associated with an RPC and the synchronization + // variables needed to notify its completion. + struct RpcState { + std::mutex mu; + std::condition_variable cv; + bool done = false; + EchoRequest request; + EchoResponse response; + ClientContext cli_ctx; + + RpcState() = default; + ~RpcState() { + // Grab the lock to prevent destruction while another is still holding + // lock + std::lock_guard<std::mutex> lock(mu); + } + }; + std::vector<RpcState> rpc_state(3); + for (size_t i = 0; i < rpc_state.size(); i++) { + TString message = "Hello locked world"; + message += ToString(i); + rpc_state[i].request.set_message(message); + } + + // Grab a lock and then start an RPC whose callback grabs the same lock and + // then calls this function to start the next RPC under lock (up to a limit of + // the size of the rpc_state vector). + std::function<void(int)> nested_call = [this, &nested_call, + &rpc_state](int index) { + std::lock_guard<std::mutex> l(rpc_state[index].mu); + stub_->experimental_async()->Echo( + &rpc_state[index].cli_ctx, &rpc_state[index].request, + &rpc_state[index].response, + [index, &nested_call, &rpc_state](Status s) { + std::lock_guard<std::mutex> l1(rpc_state[index].mu); + EXPECT_TRUE(s.ok()); + rpc_state[index].done = true; + rpc_state[index].cv.notify_all(); + // Call the next level of nesting if possible + if (index + 1 < rpc_state.size()) { + nested_call(index + 1); + } + }); + }; + + nested_call(0); + + // Wait for completion notifications from all RPCs. Order doesn't matter. + for (RpcState& state : rpc_state) { + std::unique_lock<std::mutex> l(state.mu); + while (!state.done) { + state.cv.wait(l); + } + EXPECT_EQ(state.request.message(), state.response.message()); + } +} + TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) { MAYBE_SKIP_TEST; ResetStub(); @@ -533,21 +533,21 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) { TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) { MAYBE_SKIP_TEST; ResetStub(); - SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true); + SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true); } TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) { MAYBE_SKIP_TEST; ResetStub(); - SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true); + SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true); +} + +TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) { + MAYBE_SKIP_TEST; + ResetStub(); + SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false); } -TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) { - MAYBE_SKIP_TEST; - ResetStub(); - SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false); -} - #if GRPC_ALLOW_EXCEPTIONS TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) { MAYBE_SKIP_TEST; @@ -619,7 +619,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) { ClientContext context; request.set_message("hello"); context.AddMetadata(kServerTryCancelRequest, - ToString(CANCEL_BEFORE_PROCESSING)); + ToString(CANCEL_BEFORE_PROCESSING)); std::mutex mu; std::condition_variable cv; @@ -654,14 +654,14 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> { : server_try_cancel_(server_try_cancel), num_msgs_to_send_(num_msgs_to_send), client_cancel_{client_cancel} { - TString msg{"Hello server."}; + TString msg{"Hello server."}; for (int i = 0; i < num_msgs_to_send; i++) { desired_ += msg; } if (server_try_cancel != DO_NOT_CANCEL) { // Send server_try_cancel value in the client metadata context_.AddMetadata(kServerTryCancelRequest, - ToString(server_try_cancel)); + ToString(server_try_cancel)); } context_.set_initial_metadata_corked(true); stub->experimental_async()->RequestStream(&context_, &response_, this); @@ -735,7 +735,7 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> { const ServerTryCancelRequestPhase server_try_cancel_; int num_msgs_sent_{0}; const int num_msgs_to_send_; - TString desired_; + TString desired_; const ClientCancelInfo client_cancel_; std::mutex mu_; std::condition_variable cv_; @@ -860,72 +860,72 @@ TEST_P(ClientCallbackEnd2endTest, UnaryReactor) { } } -TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) { - MAYBE_SKIP_TEST; - ResetStub(); - const TString kMethodName("/grpc.testing.EchoTestService/Echo"); - class UnaryClient : public grpc::experimental::ClientUnaryReactor { - public: - UnaryClient(grpc::GenericStub* stub, const TString& method_name) { - cli_ctx_.AddMetadata("key1", "val1"); - cli_ctx_.AddMetadata("key2", "val2"); - request_.mutable_param()->set_echo_metadata_initially(true); - request_.set_message("Hello metadata"); - send_buf_ = SerializeToByteBuffer(&request_); - - stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name, - send_buf_.get(), &recv_buf_, this); - StartCall(); - } - void OnReadInitialMetadataDone(bool ok) override { - EXPECT_TRUE(ok); - EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1")); - EXPECT_EQ( - "val1", - ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second)); - EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2")); - EXPECT_EQ( - "val2", - ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second)); - initial_metadata_done_ = true; - } - void OnDone(const Status& s) override { - EXPECT_TRUE(initial_metadata_done_); - EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size()); - EXPECT_TRUE(s.ok()); - EchoResponse response; - EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); - EXPECT_EQ(request_.message(), response.message()); - std::unique_lock<std::mutex> l(mu_); - done_ = true; - cv_.notify_one(); - } - void Await() { - std::unique_lock<std::mutex> l(mu_); - while (!done_) { - cv_.wait(l); - } - } - - private: - EchoRequest request_; - std::unique_ptr<ByteBuffer> send_buf_; - ByteBuffer recv_buf_; - ClientContext cli_ctx_; - std::mutex mu_; - std::condition_variable cv_; - bool done_{false}; - bool initial_metadata_done_{false}; - }; - - UnaryClient test{generic_stub_.get(), kMethodName}; - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - +TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) { + MAYBE_SKIP_TEST; + ResetStub(); + const TString kMethodName("/grpc.testing.EchoTestService/Echo"); + class UnaryClient : public grpc::experimental::ClientUnaryReactor { + public: + UnaryClient(grpc::GenericStub* stub, const TString& method_name) { + cli_ctx_.AddMetadata("key1", "val1"); + cli_ctx_.AddMetadata("key2", "val2"); + request_.mutable_param()->set_echo_metadata_initially(true); + request_.set_message("Hello metadata"); + send_buf_ = SerializeToByteBuffer(&request_); + + stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name, + send_buf_.get(), &recv_buf_, this); + StartCall(); + } + void OnReadInitialMetadataDone(bool ok) override { + EXPECT_TRUE(ok); + EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1")); + EXPECT_EQ( + "val1", + ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second)); + EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2")); + EXPECT_EQ( + "val2", + ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second)); + initial_metadata_done_ = true; + } + void OnDone(const Status& s) override { + EXPECT_TRUE(initial_metadata_done_); + EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size()); + EXPECT_TRUE(s.ok()); + EchoResponse response; + EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response)); + EXPECT_EQ(request_.message(), response.message()); + std::unique_lock<std::mutex> l(mu_); + done_ = true; + cv_.notify_one(); + } + void Await() { + std::unique_lock<std::mutex> l(mu_); + while (!done_) { + cv_.wait(l); + } + } + + private: + EchoRequest request_; + std::unique_ptr<ByteBuffer> send_buf_; + ByteBuffer recv_buf_; + ClientContext cli_ctx_; + std::mutex mu_; + std::condition_variable cv_; + bool done_{false}; + bool initial_metadata_done_{false}; + }; + + UnaryClient test{generic_stub_.get(), kMethodName}; + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> { public: ReadClient(grpc::testing::EchoTestService::Stub* stub, @@ -935,7 +935,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> { if (server_try_cancel_ != DO_NOT_CANCEL) { // Send server_try_cancel value in the client metadata context_.AddMetadata(kServerTryCancelRequest, - ToString(server_try_cancel)); + ToString(server_try_cancel)); } request_.set_message("Hello client "); stub->experimental_async()->ResponseStream(&context_, &request_, this); @@ -956,7 +956,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> { } else { EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend); EXPECT_EQ(response_.message(), - request_.message() + ToString(reads_complete_)); + request_.message() + ToString(reads_complete_)); reads_complete_++; if (client_cancel_.cancel && reads_complete_ == client_cancel_.ops_before_cancel) { @@ -1088,20 +1088,20 @@ class BidiClient public: BidiClient(grpc::testing::EchoTestService::Stub* stub, ServerTryCancelRequestPhase server_try_cancel, - int num_msgs_to_send, bool cork_metadata, bool first_write_async, - ClientCancelInfo client_cancel = {}) + int num_msgs_to_send, bool cork_metadata, bool first_write_async, + ClientCancelInfo client_cancel = {}) : server_try_cancel_(server_try_cancel), msgs_to_send_{num_msgs_to_send}, client_cancel_{client_cancel} { if (server_try_cancel_ != DO_NOT_CANCEL) { // Send server_try_cancel value in the client metadata context_.AddMetadata(kServerTryCancelRequest, - ToString(server_try_cancel)); + ToString(server_try_cancel)); } request_.set_message("Hello fren "); - context_.set_initial_metadata_corked(cork_metadata); + context_.set_initial_metadata_corked(cork_metadata); stub->experimental_async()->BidiStream(&context_, this); - MaybeAsyncWrite(first_write_async); + MaybeAsyncWrite(first_write_async); StartRead(&response_); StartCall(); } @@ -1122,10 +1122,10 @@ class BidiClient } } void OnWriteDone(bool ok) override { - if (async_write_thread_.joinable()) { - async_write_thread_.join(); - RemoveHold(); - } + if (async_write_thread_.joinable()) { + async_write_thread_.join(); + RemoveHold(); + } if (server_try_cancel_ == DO_NOT_CANCEL) { EXPECT_TRUE(ok); } else if (!ok) { @@ -1190,26 +1190,26 @@ class BidiClient } private: - void MaybeAsyncWrite(bool first_write_async) { - if (first_write_async) { - // Make sure that we have a write to issue. - // TODO(vjpai): Make this work with 0 writes case as well. - assert(msgs_to_send_ >= 1); - - AddHold(); - async_write_thread_ = std::thread([this] { - std::unique_lock<std::mutex> lock(async_write_thread_mu_); - async_write_thread_cv_.wait( - lock, [this] { return async_write_thread_start_; }); - MaybeWrite(); - }); - std::lock_guard<std::mutex> lock(async_write_thread_mu_); - async_write_thread_start_ = true; - async_write_thread_cv_.notify_one(); - return; - } - MaybeWrite(); - } + void MaybeAsyncWrite(bool first_write_async) { + if (first_write_async) { + // Make sure that we have a write to issue. + // TODO(vjpai): Make this work with 0 writes case as well. + assert(msgs_to_send_ >= 1); + + AddHold(); + async_write_thread_ = std::thread([this] { + std::unique_lock<std::mutex> lock(async_write_thread_mu_); + async_write_thread_cv_.wait( + lock, [this] { return async_write_thread_start_; }); + MaybeWrite(); + }); + std::lock_guard<std::mutex> lock(async_write_thread_mu_); + async_write_thread_start_ = true; + async_write_thread_cv_.notify_one(); + return; + } + MaybeWrite(); + } void MaybeWrite() { if (client_cancel_.cancel && writes_complete_ == client_cancel_.ops_before_cancel) { @@ -1231,18 +1231,57 @@ class BidiClient std::mutex mu_; std::condition_variable cv_; bool done_ = false; - std::thread async_write_thread_; - bool async_write_thread_start_ = false; - std::mutex async_write_thread_mu_; - std::condition_variable async_write_thread_cv_; + std::thread async_write_thread_; + bool async_write_thread_start_ = false; + std::mutex async_write_thread_mu_; + std::condition_variable async_write_thread_cv_; }; TEST_P(ClientCallbackEnd2endTest, BidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/false); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/false); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/true); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/true, /*first_write_async=*/false); + test.Await(); + // Make sure that the server interceptors were not notified of a cancel + if (GetParam().use_interceptors) { + EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { + MAYBE_SKIP_TEST; + ResetStub(); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/true, /*first_write_async=*/true); test.Await(); // Make sure that the server interceptors were not notified of a cancel if (GetParam().use_interceptors) { @@ -1250,52 +1289,13 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) { } } -TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/true); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/true, /*first_write_async=*/false); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) { - MAYBE_SKIP_TEST; - ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/true, /*first_write_async=*/true); - test.Await(); - // Make sure that the server interceptors were not notified of a cancel - if (GetParam().use_interceptors) { - EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel()); - } -} - TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), DO_NOT_CANCEL, - kServerDefaultResponseStreamsToSend, - /*cork_metadata=*/false, /*first_write_async=*/false, - ClientCancelInfo(2)); + BidiClient test(stub_.get(), DO_NOT_CANCEL, + kServerDefaultResponseStreamsToSend, + /*cork_metadata=*/false, /*first_write_async=*/false, + ClientCancelInfo(2)); test.Await(); // Make sure that the server interceptors were notified of a cancel if (GetParam().use_interceptors) { @@ -1307,8 +1307,8 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2, - /*cork_metadata=*/false, /*first_write_async=*/false); + BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2, + /*cork_metadata=*/false, /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1321,9 +1321,9 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING, - /*num_msgs_to_send=*/10, /*cork_metadata=*/false, - /*first_write_async=*/false); + BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING, + /*num_msgs_to_send=*/10, /*cork_metadata=*/false, + /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1336,8 +1336,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) { TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) { MAYBE_SKIP_TEST; ResetStub(); - BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5, - /*cork_metadata=*/false, /*first_write_async=*/false); + BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5, + /*cork_metadata=*/false, /*first_write_async=*/false); test.Await(); // Make sure that the server interceptors were notified if (GetParam().use_interceptors) { @@ -1452,12 +1452,12 @@ TEST_P(ClientCallbackEnd2endTest, done_cv_.wait(l); } } - // RemoveHold under the same lock used for OnDone to make sure that we don't - // call OnDone directly or indirectly from the RemoveHold function. - void RemoveHoldUnderLock() { - std::unique_lock<std::mutex> l(mu_); - RemoveHold(); - } + // RemoveHold under the same lock used for OnDone to make sure that we don't + // call OnDone directly or indirectly from the RemoveHold function. + void RemoveHoldUnderLock() { + std::unique_lock<std::mutex> l(mu_); + RemoveHold(); + } const Status& status() { std::unique_lock<std::mutex> l(mu_); return status_; @@ -1502,7 +1502,7 @@ TEST_P(ClientCallbackEnd2endTest, ++reads_complete; } } - client.RemoveHoldUnderLock(); + client.RemoveHoldUnderLock(); client.Await(); EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete); @@ -1516,7 +1516,7 @@ std::vector<TestScenario> CreateTestScenarios(bool test_insecure) { #endif std::vector<TestScenario> scenarios; - std::vector<TString> credentials_types{ + std::vector<TString> credentials_types{ GetCredentialsProvider()->GetSecureCredentialsTypeList()}; auto insec_ok = [] { // Only allow insecure credentials type when it is registered with the @@ -1556,8 +1556,8 @@ INSTANTIATE_TEST_SUITE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest, } // namespace grpc int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); grpc_init(); int ret = RUN_ALL_TESTS(); grpc_shutdown(); |