aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc
diff options
context:
space:
mode:
authorheretic <heretic@yandex-team.ru>2022-02-10 16:45:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:46 +0300
commit81eddc8c0b55990194e112b02d127b87d54164a9 (patch)
tree9142afc54d335ea52910662635b898e79e192e49 /contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc
parent397cbe258b9e064f49c4ca575279f02f39fef76e (diff)
downloadydb-81eddc8c0b55990194e112b02d127b87d54164a9.tar.gz
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc')
-rw-r--r--contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc586
1 files changed, 293 insertions, 293 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc b/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc
index 467f482d3f..12cb40a953 100644
--- a/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc
+++ b/contrib/libs/grpc/test/cpp/end2end/client_callback_end2end_test.cc
@@ -25,15 +25,15 @@
#include <grpcpp/server_builder.h>
#include <grpcpp/server_context.h>
#include <grpcpp/support/client_callback.h>
-#include <gtest/gtest.h>
-
-#include <algorithm>
-#include <condition_variable>
-#include <functional>
-#include <mutex>
-#include <sstream>
-#include <thread>
-
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <condition_variable>
+#include <functional>
+#include <mutex>
+#include <sstream>
+#include <thread>
+
#include "src/core/lib/gpr/env.h"
#include "src/core/lib/iomgr/iomgr.h"
#include "src/proto/grpc/testing/echo.grpc.pb.h"
@@ -65,7 +65,7 @@ enum class Protocol { INPROC, TCP };
class TestScenario {
public:
TestScenario(bool serve_callback, Protocol protocol, bool intercept,
- const TString& creds_type)
+ const TString& creds_type)
: callback_server(serve_callback),
protocol(protocol),
use_interceptors(intercept),
@@ -74,7 +74,7 @@ class TestScenario {
bool callback_server;
Protocol protocol;
bool use_interceptors;
- const TString credentials_type;
+ const TString credentials_type;
};
static std::ostream& operator<<(std::ostream& out,
@@ -180,7 +180,7 @@ class ClientCallbackEnd2endTest
}
void SendRpcs(int num_rpcs, bool with_binary_metadata) {
- TString test_string("");
+ TString test_string("");
for (int i = 0; i < num_rpcs; i++) {
EchoRequest request;
EchoResponse response;
@@ -188,12 +188,12 @@ class ClientCallbackEnd2endTest
test_string += "Hello world. ";
request.set_message(test_string);
- TString val;
+ TString val;
if (with_binary_metadata) {
request.mutable_param()->set_echo_metadata(true);
char bytes[8] = {'\0', '\1', '\2', '\3',
'\4', '\5', '\6', static_cast<char>(i)};
- val = TString(bytes, 8);
+ val = TString(bytes, 8);
cli_ctx.AddMetadata("custom-bin", val);
}
@@ -228,8 +228,8 @@ class ClientCallbackEnd2endTest
}
void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
- const TString kMethodName("/grpc.testing.EchoTestService/Echo");
- TString test_string("");
+ const TString kMethodName("/grpc.testing.EchoTestService/Echo");
+ TString test_string("");
for (int i = 0; i < num_rpcs; i++) {
EchoRequest request;
std::unique_ptr<ByteBuffer> send_buf;
@@ -269,17 +269,17 @@ class ClientCallbackEnd2endTest
}
}
- void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
- const TString kMethodName("/grpc.testing.EchoTestService/Echo");
- TString test_string("");
+ void SendGenericEchoAsBidi(int num_rpcs, int reuses, bool do_writes_done) {
+ const TString kMethodName("/grpc.testing.EchoTestService/Echo");
+ TString test_string("");
for (int i = 0; i < num_rpcs; i++) {
test_string += "Hello world. ";
class Client : public grpc::experimental::ClientBidiReactor<ByteBuffer,
ByteBuffer> {
public:
- Client(ClientCallbackEnd2endTest* test, const TString& method_name,
- const TString& test_str, int reuses, bool do_writes_done)
- : reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
+ Client(ClientCallbackEnd2endTest* test, const TString& method_name,
+ const TString& test_str, int reuses, bool do_writes_done)
+ : reuses_remaining_(reuses), do_writes_done_(do_writes_done) {
activate_ = [this, test, method_name, test_str] {
if (reuses_remaining_ > 0) {
cli_ctx_.reset(new ClientContext);
@@ -299,11 +299,11 @@ class ClientCallbackEnd2endTest
};
activate_();
}
- void OnWriteDone(bool /*ok*/) override {
- if (do_writes_done_) {
- StartWritesDone();
- }
- }
+ void OnWriteDone(bool /*ok*/) override {
+ if (do_writes_done_) {
+ StartWritesDone();
+ }
+ }
void OnReadDone(bool /*ok*/) override {
EchoResponse response;
EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
@@ -329,11 +329,11 @@ class ClientCallbackEnd2endTest
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
- const bool do_writes_done_;
- };
+ const bool do_writes_done_;
+ };
+
+ Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
- Client rpc(this, kMethodName, test_string, reuses, do_writes_done);
-
rpc.Await();
}
}
@@ -355,102 +355,102 @@ TEST_P(ClientCallbackEnd2endTest, SimpleRpc) {
SendRpcs(1, false);
}
-TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) {
+TEST_P(ClientCallbackEnd2endTest, SimpleRpcExpectedError) {
MAYBE_SKIP_TEST;
ResetStub();
-
- EchoRequest request;
- EchoResponse response;
- ClientContext cli_ctx;
- ErrorStatus error_status;
-
- request.set_message("Hello failure");
- error_status.set_code(1); // CANCELLED
- error_status.set_error_message("cancel error message");
- *request.mutable_param()->mutable_expected_error() = error_status;
-
- std::mutex mu;
+
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext cli_ctx;
+ ErrorStatus error_status;
+
+ request.set_message("Hello failure");
+ error_status.set_code(1); // CANCELLED
+ error_status.set_error_message("cancel error message");
+ *request.mutable_param()->mutable_expected_error() = error_status;
+
+ std::mutex mu;
std::condition_variable cv;
bool done = false;
- stub_->experimental_async()->Echo(
- &cli_ctx, &request, &response,
- [&response, &done, &mu, &cv, &error_status](Status s) {
- EXPECT_EQ("", response.message());
- EXPECT_EQ(error_status.code(), s.error_code());
- EXPECT_EQ(error_status.error_message(), s.error_message());
- std::lock_guard<std::mutex> l(mu);
- done = true;
- cv.notify_one();
- });
-
- std::unique_lock<std::mutex> l(mu);
+ stub_->experimental_async()->Echo(
+ &cli_ctx, &request, &response,
+ [&response, &done, &mu, &cv, &error_status](Status s) {
+ EXPECT_EQ("", response.message());
+ EXPECT_EQ(error_status.code(), s.error_code());
+ EXPECT_EQ(error_status.error_message(), s.error_message());
+ std::lock_guard<std::mutex> l(mu);
+ done = true;
+ cv.notify_one();
+ });
+
+ std::unique_lock<std::mutex> l(mu);
while (!done) {
cv.wait(l);
}
}
-TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) {
- MAYBE_SKIP_TEST;
- ResetStub();
-
- // The request/response state associated with an RPC and the synchronization
- // variables needed to notify its completion.
- struct RpcState {
- std::mutex mu;
- std::condition_variable cv;
- bool done = false;
- EchoRequest request;
- EchoResponse response;
- ClientContext cli_ctx;
-
- RpcState() = default;
- ~RpcState() {
- // Grab the lock to prevent destruction while another is still holding
- // lock
- std::lock_guard<std::mutex> lock(mu);
- }
- };
- std::vector<RpcState> rpc_state(3);
- for (size_t i = 0; i < rpc_state.size(); i++) {
- TString message = "Hello locked world";
- message += ToString(i);
- rpc_state[i].request.set_message(message);
- }
-
- // Grab a lock and then start an RPC whose callback grabs the same lock and
- // then calls this function to start the next RPC under lock (up to a limit of
- // the size of the rpc_state vector).
- std::function<void(int)> nested_call = [this, &nested_call,
- &rpc_state](int index) {
- std::lock_guard<std::mutex> l(rpc_state[index].mu);
- stub_->experimental_async()->Echo(
- &rpc_state[index].cli_ctx, &rpc_state[index].request,
- &rpc_state[index].response,
- [index, &nested_call, &rpc_state](Status s) {
- std::lock_guard<std::mutex> l1(rpc_state[index].mu);
- EXPECT_TRUE(s.ok());
- rpc_state[index].done = true;
- rpc_state[index].cv.notify_all();
- // Call the next level of nesting if possible
- if (index + 1 < rpc_state.size()) {
- nested_call(index + 1);
- }
- });
- };
-
- nested_call(0);
-
- // Wait for completion notifications from all RPCs. Order doesn't matter.
- for (RpcState& state : rpc_state) {
- std::unique_lock<std::mutex> l(state.mu);
- while (!state.done) {
- state.cv.wait(l);
- }
- EXPECT_EQ(state.request.message(), state.response.message());
- }
-}
-
+TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLockNested) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+
+ // The request/response state associated with an RPC and the synchronization
+ // variables needed to notify its completion.
+ struct RpcState {
+ std::mutex mu;
+ std::condition_variable cv;
+ bool done = false;
+ EchoRequest request;
+ EchoResponse response;
+ ClientContext cli_ctx;
+
+ RpcState() = default;
+ ~RpcState() {
+ // Grab the lock to prevent destruction while another is still holding
+ // lock
+ std::lock_guard<std::mutex> lock(mu);
+ }
+ };
+ std::vector<RpcState> rpc_state(3);
+ for (size_t i = 0; i < rpc_state.size(); i++) {
+ TString message = "Hello locked world";
+ message += ToString(i);
+ rpc_state[i].request.set_message(message);
+ }
+
+ // Grab a lock and then start an RPC whose callback grabs the same lock and
+ // then calls this function to start the next RPC under lock (up to a limit of
+ // the size of the rpc_state vector).
+ std::function<void(int)> nested_call = [this, &nested_call,
+ &rpc_state](int index) {
+ std::lock_guard<std::mutex> l(rpc_state[index].mu);
+ stub_->experimental_async()->Echo(
+ &rpc_state[index].cli_ctx, &rpc_state[index].request,
+ &rpc_state[index].response,
+ [index, &nested_call, &rpc_state](Status s) {
+ std::lock_guard<std::mutex> l1(rpc_state[index].mu);
+ EXPECT_TRUE(s.ok());
+ rpc_state[index].done = true;
+ rpc_state[index].cv.notify_all();
+ // Call the next level of nesting if possible
+ if (index + 1 < rpc_state.size()) {
+ nested_call(index + 1);
+ }
+ });
+ };
+
+ nested_call(0);
+
+ // Wait for completion notifications from all RPCs. Order doesn't matter.
+ for (RpcState& state : rpc_state) {
+ std::unique_lock<std::mutex> l(state.mu);
+ while (!state.done) {
+ state.cv.wait(l);
+ }
+ EXPECT_EQ(state.request.message(), state.response.message());
+ }
+}
+
TEST_P(ClientCallbackEnd2endTest, SimpleRpcUnderLock) {
MAYBE_SKIP_TEST;
ResetStub();
@@ -533,21 +533,21 @@ TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidi) {
MAYBE_SKIP_TEST;
ResetStub();
- SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
+ SendGenericEchoAsBidi(10, 1, /*do_writes_done=*/true);
}
TEST_P(ClientCallbackEnd2endTest, SequentialGenericRpcsAsBidiWithReactorReuse) {
MAYBE_SKIP_TEST;
ResetStub();
- SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
+ SendGenericEchoAsBidi(10, 10, /*do_writes_done=*/true);
+}
+
+TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
}
-TEST_P(ClientCallbackEnd2endTest, GenericRpcNoWritesDone) {
- MAYBE_SKIP_TEST;
- ResetStub();
- SendGenericEchoAsBidi(1, 1, /*do_writes_done=*/false);
-}
-
#if GRPC_ALLOW_EXCEPTIONS
TEST_P(ClientCallbackEnd2endTest, ExceptingRpc) {
MAYBE_SKIP_TEST;
@@ -619,7 +619,7 @@ TEST_P(ClientCallbackEnd2endTest, RequestEchoServerCancel) {
ClientContext context;
request.set_message("hello");
context.AddMetadata(kServerTryCancelRequest,
- ToString(CANCEL_BEFORE_PROCESSING));
+ ToString(CANCEL_BEFORE_PROCESSING));
std::mutex mu;
std::condition_variable cv;
@@ -654,14 +654,14 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
: server_try_cancel_(server_try_cancel),
num_msgs_to_send_(num_msgs_to_send),
client_cancel_{client_cancel} {
- TString msg{"Hello server."};
+ TString msg{"Hello server."};
for (int i = 0; i < num_msgs_to_send; i++) {
desired_ += msg;
}
if (server_try_cancel != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
- ToString(server_try_cancel));
+ ToString(server_try_cancel));
}
context_.set_initial_metadata_corked(true);
stub->experimental_async()->RequestStream(&context_, &response_, this);
@@ -735,7 +735,7 @@ class WriteClient : public grpc::experimental::ClientWriteReactor<EchoRequest> {
const ServerTryCancelRequestPhase server_try_cancel_;
int num_msgs_sent_{0};
const int num_msgs_to_send_;
- TString desired_;
+ TString desired_;
const ClientCancelInfo client_cancel_;
std::mutex mu_;
std::condition_variable cv_;
@@ -860,72 +860,72 @@ TEST_P(ClientCallbackEnd2endTest, UnaryReactor) {
}
}
-TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
- MAYBE_SKIP_TEST;
- ResetStub();
- const TString kMethodName("/grpc.testing.EchoTestService/Echo");
- class UnaryClient : public grpc::experimental::ClientUnaryReactor {
- public:
- UnaryClient(grpc::GenericStub* stub, const TString& method_name) {
- cli_ctx_.AddMetadata("key1", "val1");
- cli_ctx_.AddMetadata("key2", "val2");
- request_.mutable_param()->set_echo_metadata_initially(true);
- request_.set_message("Hello metadata");
- send_buf_ = SerializeToByteBuffer(&request_);
-
- stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name,
- send_buf_.get(), &recv_buf_, this);
- StartCall();
- }
- void OnReadInitialMetadataDone(bool ok) override {
- EXPECT_TRUE(ok);
- EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
- EXPECT_EQ(
- "val1",
- ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
- EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
- EXPECT_EQ(
- "val2",
- ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
- initial_metadata_done_ = true;
- }
- void OnDone(const Status& s) override {
- EXPECT_TRUE(initial_metadata_done_);
- EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
- EXPECT_TRUE(s.ok());
- EchoResponse response;
- EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
- EXPECT_EQ(request_.message(), response.message());
- std::unique_lock<std::mutex> l(mu_);
- done_ = true;
- cv_.notify_one();
- }
- void Await() {
- std::unique_lock<std::mutex> l(mu_);
- while (!done_) {
- cv_.wait(l);
- }
- }
-
- private:
- EchoRequest request_;
- std::unique_ptr<ByteBuffer> send_buf_;
- ByteBuffer recv_buf_;
- ClientContext cli_ctx_;
- std::mutex mu_;
- std::condition_variable cv_;
- bool done_{false};
- bool initial_metadata_done_{false};
- };
-
- UnaryClient test{generic_stub_.get(), kMethodName};
- test.Await();
- // Make sure that the server interceptors were not notified of a cancel
- if (GetParam().use_interceptors) {
- EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
- }
-}
-
+TEST_P(ClientCallbackEnd2endTest, GenericUnaryReactor) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ const TString kMethodName("/grpc.testing.EchoTestService/Echo");
+ class UnaryClient : public grpc::experimental::ClientUnaryReactor {
+ public:
+ UnaryClient(grpc::GenericStub* stub, const TString& method_name) {
+ cli_ctx_.AddMetadata("key1", "val1");
+ cli_ctx_.AddMetadata("key2", "val2");
+ request_.mutable_param()->set_echo_metadata_initially(true);
+ request_.set_message("Hello metadata");
+ send_buf_ = SerializeToByteBuffer(&request_);
+
+ stub->experimental().PrepareUnaryCall(&cli_ctx_, method_name,
+ send_buf_.get(), &recv_buf_, this);
+ StartCall();
+ }
+ void OnReadInitialMetadataDone(bool ok) override {
+ EXPECT_TRUE(ok);
+ EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key1"));
+ EXPECT_EQ(
+ "val1",
+ ToString(cli_ctx_.GetServerInitialMetadata().find("key1")->second));
+ EXPECT_EQ(1u, cli_ctx_.GetServerInitialMetadata().count("key2"));
+ EXPECT_EQ(
+ "val2",
+ ToString(cli_ctx_.GetServerInitialMetadata().find("key2")->second));
+ initial_metadata_done_ = true;
+ }
+ void OnDone(const Status& s) override {
+ EXPECT_TRUE(initial_metadata_done_);
+ EXPECT_EQ(0u, cli_ctx_.GetServerTrailingMetadata().size());
+ EXPECT_TRUE(s.ok());
+ EchoResponse response;
+ EXPECT_TRUE(ParseFromByteBuffer(&recv_buf_, &response));
+ EXPECT_EQ(request_.message(), response.message());
+ std::unique_lock<std::mutex> l(mu_);
+ done_ = true;
+ cv_.notify_one();
+ }
+ void Await() {
+ std::unique_lock<std::mutex> l(mu_);
+ while (!done_) {
+ cv_.wait(l);
+ }
+ }
+
+ private:
+ EchoRequest request_;
+ std::unique_ptr<ByteBuffer> send_buf_;
+ ByteBuffer recv_buf_;
+ ClientContext cli_ctx_;
+ std::mutex mu_;
+ std::condition_variable cv_;
+ bool done_{false};
+ bool initial_metadata_done_{false};
+ };
+
+ UnaryClient test{generic_stub_.get(), kMethodName};
+ test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
public:
ReadClient(grpc::testing::EchoTestService::Stub* stub,
@@ -935,7 +935,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
- ToString(server_try_cancel));
+ ToString(server_try_cancel));
}
request_.set_message("Hello client ");
stub->experimental_async()->ResponseStream(&context_, &request_, this);
@@ -956,7 +956,7 @@ class ReadClient : public grpc::experimental::ClientReadReactor<EchoResponse> {
} else {
EXPECT_LE(reads_complete_, kServerDefaultResponseStreamsToSend);
EXPECT_EQ(response_.message(),
- request_.message() + ToString(reads_complete_));
+ request_.message() + ToString(reads_complete_));
reads_complete_++;
if (client_cancel_.cancel &&
reads_complete_ == client_cancel_.ops_before_cancel) {
@@ -1088,20 +1088,20 @@ class BidiClient
public:
BidiClient(grpc::testing::EchoTestService::Stub* stub,
ServerTryCancelRequestPhase server_try_cancel,
- int num_msgs_to_send, bool cork_metadata, bool first_write_async,
- ClientCancelInfo client_cancel = {})
+ int num_msgs_to_send, bool cork_metadata, bool first_write_async,
+ ClientCancelInfo client_cancel = {})
: server_try_cancel_(server_try_cancel),
msgs_to_send_{num_msgs_to_send},
client_cancel_{client_cancel} {
if (server_try_cancel_ != DO_NOT_CANCEL) {
// Send server_try_cancel value in the client metadata
context_.AddMetadata(kServerTryCancelRequest,
- ToString(server_try_cancel));
+ ToString(server_try_cancel));
}
request_.set_message("Hello fren ");
- context_.set_initial_metadata_corked(cork_metadata);
+ context_.set_initial_metadata_corked(cork_metadata);
stub->experimental_async()->BidiStream(&context_, this);
- MaybeAsyncWrite(first_write_async);
+ MaybeAsyncWrite(first_write_async);
StartRead(&response_);
StartCall();
}
@@ -1122,10 +1122,10 @@ class BidiClient
}
}
void OnWriteDone(bool ok) override {
- if (async_write_thread_.joinable()) {
- async_write_thread_.join();
- RemoveHold();
- }
+ if (async_write_thread_.joinable()) {
+ async_write_thread_.join();
+ RemoveHold();
+ }
if (server_try_cancel_ == DO_NOT_CANCEL) {
EXPECT_TRUE(ok);
} else if (!ok) {
@@ -1190,26 +1190,26 @@ class BidiClient
}
private:
- void MaybeAsyncWrite(bool first_write_async) {
- if (first_write_async) {
- // Make sure that we have a write to issue.
- // TODO(vjpai): Make this work with 0 writes case as well.
- assert(msgs_to_send_ >= 1);
-
- AddHold();
- async_write_thread_ = std::thread([this] {
- std::unique_lock<std::mutex> lock(async_write_thread_mu_);
- async_write_thread_cv_.wait(
- lock, [this] { return async_write_thread_start_; });
- MaybeWrite();
- });
- std::lock_guard<std::mutex> lock(async_write_thread_mu_);
- async_write_thread_start_ = true;
- async_write_thread_cv_.notify_one();
- return;
- }
- MaybeWrite();
- }
+ void MaybeAsyncWrite(bool first_write_async) {
+ if (first_write_async) {
+ // Make sure that we have a write to issue.
+ // TODO(vjpai): Make this work with 0 writes case as well.
+ assert(msgs_to_send_ >= 1);
+
+ AddHold();
+ async_write_thread_ = std::thread([this] {
+ std::unique_lock<std::mutex> lock(async_write_thread_mu_);
+ async_write_thread_cv_.wait(
+ lock, [this] { return async_write_thread_start_; });
+ MaybeWrite();
+ });
+ std::lock_guard<std::mutex> lock(async_write_thread_mu_);
+ async_write_thread_start_ = true;
+ async_write_thread_cv_.notify_one();
+ return;
+ }
+ MaybeWrite();
+ }
void MaybeWrite() {
if (client_cancel_.cancel &&
writes_complete_ == client_cancel_.ops_before_cancel) {
@@ -1231,18 +1231,57 @@ class BidiClient
std::mutex mu_;
std::condition_variable cv_;
bool done_ = false;
- std::thread async_write_thread_;
- bool async_write_thread_start_ = false;
- std::mutex async_write_thread_mu_;
- std::condition_variable async_write_thread_cv_;
+ std::thread async_write_thread_;
+ bool async_write_thread_start_ = false;
+ std::mutex async_write_thread_mu_;
+ std::condition_variable async_write_thread_cv_;
};
TEST_P(ClientCallbackEnd2endTest, BidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
- BidiClient test(stub_.get(), DO_NOT_CANCEL,
- kServerDefaultResponseStreamsToSend,
- /*cork_metadata=*/false, /*first_write_async=*/false);
+ BidiClient test(stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend,
+ /*cork_metadata=*/false, /*first_write_async=*/false);
+ test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test(stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend,
+ /*cork_metadata=*/false, /*first_write_async=*/true);
+ test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test(stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend,
+ /*cork_metadata=*/true, /*first_write_async=*/false);
+ test.Await();
+ // Make sure that the server interceptors were not notified of a cancel
+ if (GetParam().use_interceptors) {
+ EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
+ }
+}
+
+TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
+ MAYBE_SKIP_TEST;
+ ResetStub();
+ BidiClient test(stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend,
+ /*cork_metadata=*/true, /*first_write_async=*/true);
test.Await();
// Make sure that the server interceptors were not notified of a cancel
if (GetParam().use_interceptors) {
@@ -1250,52 +1289,13 @@ TEST_P(ClientCallbackEnd2endTest, BidiStream) {
}
}
-TEST_P(ClientCallbackEnd2endTest, BidiStreamFirstWriteAsync) {
- MAYBE_SKIP_TEST;
- ResetStub();
- BidiClient test(stub_.get(), DO_NOT_CANCEL,
- kServerDefaultResponseStreamsToSend,
- /*cork_metadata=*/false, /*first_write_async=*/true);
- test.Await();
- // Make sure that the server interceptors were not notified of a cancel
- if (GetParam().use_interceptors) {
- EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
- }
-}
-
-TEST_P(ClientCallbackEnd2endTest, BidiStreamCorked) {
- MAYBE_SKIP_TEST;
- ResetStub();
- BidiClient test(stub_.get(), DO_NOT_CANCEL,
- kServerDefaultResponseStreamsToSend,
- /*cork_metadata=*/true, /*first_write_async=*/false);
- test.Await();
- // Make sure that the server interceptors were not notified of a cancel
- if (GetParam().use_interceptors) {
- EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
- }
-}
-
-TEST_P(ClientCallbackEnd2endTest, BidiStreamCorkedFirstWriteAsync) {
- MAYBE_SKIP_TEST;
- ResetStub();
- BidiClient test(stub_.get(), DO_NOT_CANCEL,
- kServerDefaultResponseStreamsToSend,
- /*cork_metadata=*/true, /*first_write_async=*/true);
- test.Await();
- // Make sure that the server interceptors were not notified of a cancel
- if (GetParam().use_interceptors) {
- EXPECT_EQ(0, DummyInterceptor::GetNumTimesCancel());
- }
-}
-
TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
MAYBE_SKIP_TEST;
ResetStub();
- BidiClient test(stub_.get(), DO_NOT_CANCEL,
- kServerDefaultResponseStreamsToSend,
- /*cork_metadata=*/false, /*first_write_async=*/false,
- ClientCancelInfo(2));
+ BidiClient test(stub_.get(), DO_NOT_CANCEL,
+ kServerDefaultResponseStreamsToSend,
+ /*cork_metadata=*/false, /*first_write_async=*/false,
+ ClientCancelInfo(2));
test.Await();
// Make sure that the server interceptors were notified of a cancel
if (GetParam().use_interceptors) {
@@ -1307,8 +1307,8 @@ TEST_P(ClientCallbackEnd2endTest, ClientCancelsBidiStream) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
MAYBE_SKIP_TEST;
ResetStub();
- BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
- /*cork_metadata=*/false, /*first_write_async=*/false);
+ BidiClient test(stub_.get(), CANCEL_BEFORE_PROCESSING, /*num_msgs_to_send=*/2,
+ /*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@@ -1321,9 +1321,9 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelBefore) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
MAYBE_SKIP_TEST;
ResetStub();
- BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
- /*num_msgs_to_send=*/10, /*cork_metadata=*/false,
- /*first_write_async=*/false);
+ BidiClient test(stub_.get(), CANCEL_DURING_PROCESSING,
+ /*num_msgs_to_send=*/10, /*cork_metadata=*/false,
+ /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@@ -1336,8 +1336,8 @@ TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelDuring) {
TEST_P(ClientCallbackEnd2endTest, BidiStreamServerCancelAfter) {
MAYBE_SKIP_TEST;
ResetStub();
- BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
- /*cork_metadata=*/false, /*first_write_async=*/false);
+ BidiClient test(stub_.get(), CANCEL_AFTER_PROCESSING, /*num_msgs_to_send=*/5,
+ /*cork_metadata=*/false, /*first_write_async=*/false);
test.Await();
// Make sure that the server interceptors were notified
if (GetParam().use_interceptors) {
@@ -1452,12 +1452,12 @@ TEST_P(ClientCallbackEnd2endTest,
done_cv_.wait(l);
}
}
- // RemoveHold under the same lock used for OnDone to make sure that we don't
- // call OnDone directly or indirectly from the RemoveHold function.
- void RemoveHoldUnderLock() {
- std::unique_lock<std::mutex> l(mu_);
- RemoveHold();
- }
+ // RemoveHold under the same lock used for OnDone to make sure that we don't
+ // call OnDone directly or indirectly from the RemoveHold function.
+ void RemoveHoldUnderLock() {
+ std::unique_lock<std::mutex> l(mu_);
+ RemoveHold();
+ }
const Status& status() {
std::unique_lock<std::mutex> l(mu_);
return status_;
@@ -1502,7 +1502,7 @@ TEST_P(ClientCallbackEnd2endTest,
++reads_complete;
}
}
- client.RemoveHoldUnderLock();
+ client.RemoveHoldUnderLock();
client.Await();
EXPECT_EQ(kServerDefaultResponseStreamsToSend, reads_complete);
@@ -1516,7 +1516,7 @@ std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
#endif
std::vector<TestScenario> scenarios;
- std::vector<TString> credentials_types{
+ std::vector<TString> credentials_types{
GetCredentialsProvider()->GetSecureCredentialsTypeList()};
auto insec_ok = [] {
// Only allow insecure credentials type when it is registered with the
@@ -1556,8 +1556,8 @@ INSTANTIATE_TEST_SUITE_P(ClientCallbackEnd2endTest, ClientCallbackEnd2endTest,
} // namespace grpc
int main(int argc, char** argv) {
- ::testing::InitGoogleTest(&argc, argv);
- grpc::testing::TestEnvironment env(argc, argv);
+ ::testing::InitGoogleTest(&argc, argv);
+ grpc::testing::TestEnvironment env(argc, argv);
grpc_init();
int ret = RUN_ALL_TESTS();
grpc_shutdown();