aboutsummaryrefslogtreecommitdiffstats
path: root/contrib/libs/grpc/test/cpp/end2end/test_service_impl.h
diff options
context:
space:
mode:
authorheretic <heretic@yandex-team.ru>2022-02-10 16:45:46 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-10 16:45:46 +0300
commit81eddc8c0b55990194e112b02d127b87d54164a9 (patch)
tree9142afc54d335ea52910662635b898e79e192e49 /contrib/libs/grpc/test/cpp/end2end/test_service_impl.h
parent397cbe258b9e064f49c4ca575279f02f39fef76e (diff)
downloadydb-81eddc8c0b55990194e112b02d127b87d54164a9.tar.gz
Restoring authorship annotation for <heretic@yandex-team.ru>. Commit 2 of 2.
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/test_service_impl.h')
-rw-r--r--contrib/libs/grpc/test/cpp/end2end/test_service_impl.h782
1 files changed, 391 insertions, 391 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/test_service_impl.h b/contrib/libs/grpc/test/cpp/end2end/test_service_impl.h
index 83ae90fe22..5f207f1979 100644
--- a/contrib/libs/grpc/test/cpp/end2end/test_service_impl.h
+++ b/contrib/libs/grpc/test/cpp/end2end/test_service_impl.h
@@ -15,31 +15,31 @@
* limitations under the License.
*
*/
-
+
#ifndef GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
#define GRPC_TEST_CPP_END2END_TEST_SERVICE_IMPL_H
-#include <condition_variable>
+#include <condition_variable>
#include <memory>
#include <mutex>
#include <grpc/grpc.h>
-#include <grpc/support/log.h>
-#include <grpcpp/alarm.h>
-#include <grpcpp/security/credentials.h>
+#include <grpc/support/log.h>
+#include <grpcpp/alarm.h>
+#include <grpcpp/security/credentials.h>
#include <grpcpp/server_context.h>
-#include <gtest/gtest.h>
+#include <gtest/gtest.h>
+
+#include <util/generic/string.h>
+#include <thread>
-#include <util/generic/string.h>
-#include <thread>
-
#include "src/proto/grpc/testing/echo.grpc.pb.h"
-#include "test/cpp/util/string_ref_helper.h"
+#include "test/cpp/util/string_ref_helper.h"
+
+#include <util/string/cast.h>
+
+using std::chrono::system_clock;
-#include <util/string/cast.h>
-
-using std::chrono::system_clock;
-
namespace grpc {
namespace testing {
@@ -59,406 +59,406 @@ typedef enum {
CANCEL_AFTER_PROCESSING
} ServerTryCancelRequestPhase;
-namespace internal {
-// When echo_deadline is requested, deadline seen in the ServerContext is set in
-// the response in seconds.
-void MaybeEchoDeadline(experimental::ServerContextBase* context,
- const EchoRequest* request, EchoResponse* response);
-
-void CheckServerAuthContext(const experimental::ServerContextBase* context,
- const TString& expected_transport_security_type,
- const TString& expected_client_identity);
-
-// Returns the number of pairs in metadata that exactly match the given
-// key-value pair. Returns -1 if the pair wasn't found.
-int MetadataMatchCount(
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- const TString& key, const TString& value);
-
-int GetIntValueFromMetadataHelper(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value);
-
-int GetIntValueFromMetadata(
- const char* key,
- const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
- int default_value);
-
-void ServerTryCancel(ServerContext* context);
-} // namespace internal
-
-class TestServiceSignaller {
- public:
- void ClientWaitUntilRpcStarted() {
- std::unique_lock<std::mutex> lock(mu_);
- cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
- }
- void ServerWaitToContinue() {
- std::unique_lock<std::mutex> lock(mu_);
- cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
- }
- void SignalClientThatRpcStarted() {
- std::unique_lock<std::mutex> lock(mu_);
- rpc_started_ = true;
- cv_rpc_started_.notify_one();
- }
- void SignalServerToContinue() {
- std::unique_lock<std::mutex> lock(mu_);
- server_should_continue_ = true;
- cv_server_continue_.notify_one();
- }
-
- private:
- std::mutex mu_;
- std::condition_variable cv_rpc_started_;
- bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
- std::condition_variable cv_server_continue_;
- bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
-};
-
-template <typename RpcService>
-class TestMultipleServiceImpl : public RpcService {
+namespace internal {
+// When echo_deadline is requested, deadline seen in the ServerContext is set in
+// the response in seconds.
+void MaybeEchoDeadline(experimental::ServerContextBase* context,
+ const EchoRequest* request, EchoResponse* response);
+
+void CheckServerAuthContext(const experimental::ServerContextBase* context,
+ const TString& expected_transport_security_type,
+ const TString& expected_client_identity);
+
+// Returns the number of pairs in metadata that exactly match the given
+// key-value pair. Returns -1 if the pair wasn't found.
+int MetadataMatchCount(
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ const TString& key, const TString& value);
+
+int GetIntValueFromMetadataHelper(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value);
+
+int GetIntValueFromMetadata(
+ const char* key,
+ const std::multimap<grpc::string_ref, grpc::string_ref>& metadata,
+ int default_value);
+
+void ServerTryCancel(ServerContext* context);
+} // namespace internal
+
+class TestServiceSignaller {
public:
- TestMultipleServiceImpl() : signal_client_(false), host_() {}
- explicit TestMultipleServiceImpl(const TString& host)
- : signal_client_(false), host_(new TString(host)) {}
+ void ClientWaitUntilRpcStarted() {
+ std::unique_lock<std::mutex> lock(mu_);
+ cv_rpc_started_.wait(lock, [this] { return rpc_started_; });
+ }
+ void ServerWaitToContinue() {
+ std::unique_lock<std::mutex> lock(mu_);
+ cv_server_continue_.wait(lock, [this] { return server_should_continue_; });
+ }
+ void SignalClientThatRpcStarted() {
+ std::unique_lock<std::mutex> lock(mu_);
+ rpc_started_ = true;
+ cv_rpc_started_.notify_one();
+ }
+ void SignalServerToContinue() {
+ std::unique_lock<std::mutex> lock(mu_);
+ server_should_continue_ = true;
+ cv_server_continue_.notify_one();
+ }
+
+ private:
+ std::mutex mu_;
+ std::condition_variable cv_rpc_started_;
+ bool rpc_started_ /* GUARDED_BY(mu_) */ = false;
+ std::condition_variable cv_server_continue_;
+ bool server_should_continue_ /* GUARDED_BY(mu_) */ = false;
+};
+
+template <typename RpcService>
+class TestMultipleServiceImpl : public RpcService {
+ public:
+ TestMultipleServiceImpl() : signal_client_(false), host_() {}
+ explicit TestMultipleServiceImpl(const TString& host)
+ : signal_client_(false), host_(new TString(host)) {}
Status Echo(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) {
- if (request->has_param() &&
- request->param().server_notify_client_when_started()) {
- signaller_.SignalClientThatRpcStarted();
- signaller_.ServerWaitToContinue();
- }
-
- // A bit of sleep to make sure that short deadline tests fail
- if (request->has_param() && request->param().server_sleep_us() > 0) {
- gpr_sleep_until(
- gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
- gpr_time_from_micros(request->param().server_sleep_us(),
- GPR_TIMESPAN)));
- }
-
- if (request->has_param() && request->param().server_die()) {
- gpr_log(GPR_ERROR, "The request should not reach application handler.");
- GPR_ASSERT(0);
- }
- if (request->has_param() && request->param().has_expected_error()) {
- const auto& error = request->param().expected_error();
- return Status(static_cast<StatusCode>(error.code()),
- error.error_message(), error.binary_error_details());
- }
- int server_try_cancel = internal::GetIntValueFromMetadata(
- kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
- if (server_try_cancel > DO_NOT_CANCEL) {
- // Since this is a unary RPC, by the time this server handler is called,
- // the 'request' message is already read from the client. So the scenarios
- // in server_try_cancel don't make much sense. Just cancel the RPC as long
- // as server_try_cancel is not DO_NOT_CANCEL
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- response->set_message(request->message());
- internal::MaybeEchoDeadline(context, request, response);
- if (host_) {
- response->mutable_param()->set_host(*host_);
- }
- if (request->has_param() && request->param().client_cancel_after_us()) {
- {
- std::unique_lock<std::mutex> lock(mu_);
- signal_client_ = true;
- ++rpcs_waiting_for_client_cancel_;
- }
- while (!context->IsCancelled()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().client_cancel_after_us(),
- GPR_TIMESPAN)));
- }
- {
- std::unique_lock<std::mutex> lock(mu_);
- --rpcs_waiting_for_client_cancel_;
- }
- return Status::CANCELLED;
- } else if (request->has_param() &&
- request->param().server_cancel_after_us()) {
- gpr_sleep_until(gpr_time_add(
- gpr_now(GPR_CLOCK_REALTIME),
- gpr_time_from_micros(request->param().server_cancel_after_us(),
- GPR_TIMESPAN)));
- return Status::CANCELLED;
- } else if (!request->has_param() ||
- !request->param().skip_cancelled_check()) {
- EXPECT_FALSE(context->IsCancelled());
- }
-
- if (request->has_param() && request->param().echo_metadata_initially()) {
- const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
- context->client_metadata();
- for (const auto& metadatum : client_metadata) {
- context->AddInitialMetadata(::ToString(metadatum.first),
- ::ToString(metadatum.second));
- }
- }
-
- if (request->has_param() && request->param().echo_metadata()) {
- const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
- context->client_metadata();
- for (const auto& metadatum : client_metadata) {
- context->AddTrailingMetadata(::ToString(metadatum.first),
- ::ToString(metadatum.second));
- }
- // Terminate rpc with error and debug info in trailer.
- if (request->param().debug_info().stack_entries_size() ||
- !request->param().debug_info().detail().empty()) {
- TString serialized_debug_info =
- request->param().debug_info().SerializeAsString();
- context->AddTrailingMetadata(kDebugInfoTrailerKey,
- serialized_debug_info);
- return Status::CANCELLED;
- }
- }
- if (request->has_param() &&
- (request->param().expected_client_identity().length() > 0 ||
- request->param().check_auth_context())) {
- internal::CheckServerAuthContext(
- context, request->param().expected_transport_security_type(),
- request->param().expected_client_identity());
- }
- if (request->has_param() &&
- request->param().response_message_length() > 0) {
- response->set_message(
- TString(request->param().response_message_length(), '\0'));
- }
- if (request->has_param() && request->param().echo_peer()) {
- response->mutable_param()->set_peer(context->peer());
- }
- return Status::OK;
- }
-
- Status Echo1(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) {
- return Echo(context, request, response);
- }
-
- Status Echo2(ServerContext* context, const EchoRequest* request,
- EchoResponse* response) {
- return Echo(context, request, response);
- }
-
+ EchoResponse* response) {
+ if (request->has_param() &&
+ request->param().server_notify_client_when_started()) {
+ signaller_.SignalClientThatRpcStarted();
+ signaller_.ServerWaitToContinue();
+ }
+
+ // A bit of sleep to make sure that short deadline tests fail
+ if (request->has_param() && request->param().server_sleep_us() > 0) {
+ gpr_sleep_until(
+ gpr_time_add(gpr_now(GPR_CLOCK_MONOTONIC),
+ gpr_time_from_micros(request->param().server_sleep_us(),
+ GPR_TIMESPAN)));
+ }
+
+ if (request->has_param() && request->param().server_die()) {
+ gpr_log(GPR_ERROR, "The request should not reach application handler.");
+ GPR_ASSERT(0);
+ }
+ if (request->has_param() && request->param().has_expected_error()) {
+ const auto& error = request->param().expected_error();
+ return Status(static_cast<StatusCode>(error.code()),
+ error.error_message(), error.binary_error_details());
+ }
+ int server_try_cancel = internal::GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+ if (server_try_cancel > DO_NOT_CANCEL) {
+ // Since this is a unary RPC, by the time this server handler is called,
+ // the 'request' message is already read from the client. So the scenarios
+ // in server_try_cancel don't make much sense. Just cancel the RPC as long
+ // as server_try_cancel is not DO_NOT_CANCEL
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ response->set_message(request->message());
+ internal::MaybeEchoDeadline(context, request, response);
+ if (host_) {
+ response->mutable_param()->set_host(*host_);
+ }
+ if (request->has_param() && request->param().client_cancel_after_us()) {
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ signal_client_ = true;
+ ++rpcs_waiting_for_client_cancel_;
+ }
+ while (!context->IsCancelled()) {
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(request->param().client_cancel_after_us(),
+ GPR_TIMESPAN)));
+ }
+ {
+ std::unique_lock<std::mutex> lock(mu_);
+ --rpcs_waiting_for_client_cancel_;
+ }
+ return Status::CANCELLED;
+ } else if (request->has_param() &&
+ request->param().server_cancel_after_us()) {
+ gpr_sleep_until(gpr_time_add(
+ gpr_now(GPR_CLOCK_REALTIME),
+ gpr_time_from_micros(request->param().server_cancel_after_us(),
+ GPR_TIMESPAN)));
+ return Status::CANCELLED;
+ } else if (!request->has_param() ||
+ !request->param().skip_cancelled_check()) {
+ EXPECT_FALSE(context->IsCancelled());
+ }
+
+ if (request->has_param() && request->param().echo_metadata_initially()) {
+ const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
+ context->client_metadata();
+ for (const auto& metadatum : client_metadata) {
+ context->AddInitialMetadata(::ToString(metadatum.first),
+ ::ToString(metadatum.second));
+ }
+ }
+
+ if (request->has_param() && request->param().echo_metadata()) {
+ const std::multimap<grpc::string_ref, grpc::string_ref>& client_metadata =
+ context->client_metadata();
+ for (const auto& metadatum : client_metadata) {
+ context->AddTrailingMetadata(::ToString(metadatum.first),
+ ::ToString(metadatum.second));
+ }
+ // Terminate rpc with error and debug info in trailer.
+ if (request->param().debug_info().stack_entries_size() ||
+ !request->param().debug_info().detail().empty()) {
+ TString serialized_debug_info =
+ request->param().debug_info().SerializeAsString();
+ context->AddTrailingMetadata(kDebugInfoTrailerKey,
+ serialized_debug_info);
+ return Status::CANCELLED;
+ }
+ }
+ if (request->has_param() &&
+ (request->param().expected_client_identity().length() > 0 ||
+ request->param().check_auth_context())) {
+ internal::CheckServerAuthContext(
+ context, request->param().expected_transport_security_type(),
+ request->param().expected_client_identity());
+ }
+ if (request->has_param() &&
+ request->param().response_message_length() > 0) {
+ response->set_message(
+ TString(request->param().response_message_length(), '\0'));
+ }
+ if (request->has_param() && request->param().echo_peer()) {
+ response->mutable_param()->set_peer(context->peer());
+ }
+ return Status::OK;
+ }
+
+ Status Echo1(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) {
+ return Echo(context, request, response);
+ }
+
+ Status Echo2(ServerContext* context, const EchoRequest* request,
+ EchoResponse* response) {
+ return Echo(context, request, response);
+ }
+
Status CheckClientInitialMetadata(ServerContext* context,
- const SimpleRequest42* /*request*/,
- SimpleResponse42* /*response*/) {
- EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
- kCheckClientInitialMetadataKey,
- kCheckClientInitialMetadataVal),
- 1);
- EXPECT_EQ(1u,
- context->client_metadata().count(kCheckClientInitialMetadataKey));
- return Status::OK;
- }
+ const SimpleRequest42* /*request*/,
+ SimpleResponse42* /*response*/) {
+ EXPECT_EQ(internal::MetadataMatchCount(context->client_metadata(),
+ kCheckClientInitialMetadataKey,
+ kCheckClientInitialMetadataVal),
+ 1);
+ EXPECT_EQ(1u,
+ context->client_metadata().count(kCheckClientInitialMetadataKey));
+ return Status::OK;
+ }
// Unimplemented is left unimplemented to test the returned error.
Status RequestStream(ServerContext* context,
ServerReader<EchoRequest>* reader,
- EchoResponse* response) {
- // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
- // the server by calling ServerContext::TryCancel() depending on the value:
- // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
- // any message from the client
- // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
- // reading messages from the client
- // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
- // all the messages from the client
- int server_try_cancel = internal::GetIntValueFromMetadata(
- kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
-
- EchoRequest request;
- response->set_message("");
-
- if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- std::thread* server_try_cancel_thd = nullptr;
- if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread([context] { internal::ServerTryCancel(context); });
- }
-
- int num_msgs_read = 0;
- while (reader->Read(&request)) {
- response->mutable_message()->append(request.message());
- }
- gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
-
- if (server_try_cancel_thd != nullptr) {
- server_try_cancel_thd->join();
- delete server_try_cancel_thd;
- return Status::CANCELLED;
- }
-
- if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- return Status::OK;
- }
-
- // Return 'kNumResponseStreamMsgs' messages.
- // TODO(yangg) make it generic by adding a parameter into EchoRequest
+ EchoResponse* response) {
+ // If 'server_try_cancel' is set in the metadata, the RPC is cancelled by
+ // the server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads
+ // any message from the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading messages from the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server reads
+ // all the messages from the client
+ int server_try_cancel = internal::GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ EchoRequest request;
+ response->set_message("");
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = nullptr;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread([context] { internal::ServerTryCancel(context); });
+ }
+
+ int num_msgs_read = 0;
+ while (reader->Read(&request)) {
+ response->mutable_message()->append(request.message());
+ }
+ gpr_log(GPR_INFO, "Read: %d messages", num_msgs_read);
+
+ if (server_try_cancel_thd != nullptr) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ return Status::OK;
+ }
+
+ // Return 'kNumResponseStreamMsgs' messages.
+ // TODO(yangg) make it generic by adding a parameter into EchoRequest
Status ResponseStream(ServerContext* context, const EchoRequest* request,
- ServerWriter<EchoResponse>* writer) {
- // If server_try_cancel is set in the metadata, the RPC is cancelled by the
- // server by calling ServerContext::TryCancel() depending on the value:
- // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
- // any messages to the client
- // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
- // writing messages to the client
- // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
- // all the messages to the client
- int server_try_cancel = internal::GetIntValueFromMetadata(
- kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
-
- int server_coalescing_api = internal::GetIntValueFromMetadata(
- kServerUseCoalescingApi, context->client_metadata(), 0);
-
- int server_responses_to_send = internal::GetIntValueFromMetadata(
- kServerResponseStreamsToSend, context->client_metadata(),
- kServerDefaultResponseStreamsToSend);
-
- if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- EchoResponse response;
- std::thread* server_try_cancel_thd = nullptr;
- if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread([context] { internal::ServerTryCancel(context); });
- }
-
- for (int i = 0; i < server_responses_to_send; i++) {
- response.set_message(request->message() + ::ToString(i));
- if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
- writer->WriteLast(response, WriteOptions());
- } else {
- writer->Write(response);
- }
- }
-
- if (server_try_cancel_thd != nullptr) {
- server_try_cancel_thd->join();
- delete server_try_cancel_thd;
- return Status::CANCELLED;
- }
-
- if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- return Status::OK;
- }
-
- Status BidiStream(ServerContext* context,
- ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
- // If server_try_cancel is set in the metadata, the RPC is cancelled by the
- // server by calling ServerContext::TryCancel() depending on the value:
- // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
- // writes any messages from/to the client
- // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
- // reading/writing messages from/to the client
- // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
- // reads/writes all messages from/to the client
- int server_try_cancel = internal::GetIntValueFromMetadata(
- kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
-
- EchoRequest request;
- EchoResponse response;
-
- if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- std::thread* server_try_cancel_thd = nullptr;
- if (server_try_cancel == CANCEL_DURING_PROCESSING) {
- server_try_cancel_thd =
- new std::thread([context] { internal::ServerTryCancel(context); });
- }
-
- // kServerFinishAfterNReads suggests after how many reads, the server should
- // write the last message and send status (coalesced using WriteLast)
- int server_write_last = internal::GetIntValueFromMetadata(
- kServerFinishAfterNReads, context->client_metadata(), 0);
-
- int read_counts = 0;
- while (stream->Read(&request)) {
- read_counts++;
- gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
- response.set_message(request.message());
- if (read_counts == server_write_last) {
- stream->WriteLast(response, WriteOptions());
- } else {
- stream->Write(response);
- }
- }
-
- if (server_try_cancel_thd != nullptr) {
- server_try_cancel_thd->join();
- delete server_try_cancel_thd;
- return Status::CANCELLED;
- }
-
- if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
- internal::ServerTryCancel(context);
- return Status::CANCELLED;
- }
-
- return Status::OK;
- }
-
- // Unimplemented is left unimplemented to test the returned error.
+ ServerWriter<EchoResponse>* writer) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server writes
+ // any messages to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // writing messages to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server writes
+ // all the messages to the client
+ int server_try_cancel = internal::GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ int server_coalescing_api = internal::GetIntValueFromMetadata(
+ kServerUseCoalescingApi, context->client_metadata(), 0);
+
+ int server_responses_to_send = internal::GetIntValueFromMetadata(
+ kServerResponseStreamsToSend, context->client_metadata(),
+ kServerDefaultResponseStreamsToSend);
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ EchoResponse response;
+ std::thread* server_try_cancel_thd = nullptr;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread([context] { internal::ServerTryCancel(context); });
+ }
+
+ for (int i = 0; i < server_responses_to_send; i++) {
+ response.set_message(request->message() + ::ToString(i));
+ if (i == server_responses_to_send - 1 && server_coalescing_api != 0) {
+ writer->WriteLast(response, WriteOptions());
+ } else {
+ writer->Write(response);
+ }
+ }
+
+ if (server_try_cancel_thd != nullptr) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ return Status::OK;
+ }
+
+ Status BidiStream(ServerContext* context,
+ ServerReaderWriter<EchoResponse, EchoRequest>* stream) {
+ // If server_try_cancel is set in the metadata, the RPC is cancelled by the
+ // server by calling ServerContext::TryCancel() depending on the value:
+ // CANCEL_BEFORE_PROCESSING: The RPC is cancelled before the server reads/
+ // writes any messages from/to the client
+ // CANCEL_DURING_PROCESSING: The RPC is cancelled while the server is
+ // reading/writing messages from/to the client
+ // CANCEL_AFTER_PROCESSING: The RPC is cancelled after the server
+ // reads/writes all messages from/to the client
+ int server_try_cancel = internal::GetIntValueFromMetadata(
+ kServerTryCancelRequest, context->client_metadata(), DO_NOT_CANCEL);
+
+ EchoRequest request;
+ EchoResponse response;
+
+ if (server_try_cancel == CANCEL_BEFORE_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ std::thread* server_try_cancel_thd = nullptr;
+ if (server_try_cancel == CANCEL_DURING_PROCESSING) {
+ server_try_cancel_thd =
+ new std::thread([context] { internal::ServerTryCancel(context); });
+ }
+
+ // kServerFinishAfterNReads suggests after how many reads, the server should
+ // write the last message and send status (coalesced using WriteLast)
+ int server_write_last = internal::GetIntValueFromMetadata(
+ kServerFinishAfterNReads, context->client_metadata(), 0);
+
+ int read_counts = 0;
+ while (stream->Read(&request)) {
+ read_counts++;
+ gpr_log(GPR_INFO, "recv msg %s", request.message().c_str());
+ response.set_message(request.message());
+ if (read_counts == server_write_last) {
+ stream->WriteLast(response, WriteOptions());
+ } else {
+ stream->Write(response);
+ }
+ }
+
+ if (server_try_cancel_thd != nullptr) {
+ server_try_cancel_thd->join();
+ delete server_try_cancel_thd;
+ return Status::CANCELLED;
+ }
+
+ if (server_try_cancel == CANCEL_AFTER_PROCESSING) {
+ internal::ServerTryCancel(context);
+ return Status::CANCELLED;
+ }
+
+ return Status::OK;
+ }
+
+ // Unimplemented is left unimplemented to test the returned error.
bool signal_client() {
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
- void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
- void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
- uint64_t RpcsWaitingForClientCancel() {
- std::unique_lock<std::mutex> lock(mu_);
- return rpcs_waiting_for_client_cancel_;
- }
+ void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
+ void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
+ uint64_t RpcsWaitingForClientCancel() {
+ std::unique_lock<std::mutex> lock(mu_);
+ return rpcs_waiting_for_client_cancel_;
+ }
private:
bool signal_client_;
std::mutex mu_;
- TestServiceSignaller signaller_;
- std::unique_ptr<TString> host_;
- uint64_t rpcs_waiting_for_client_cancel_ = 0;
+ TestServiceSignaller signaller_;
+ std::unique_ptr<TString> host_;
+ uint64_t rpcs_waiting_for_client_cancel_ = 0;
};
class CallbackTestServiceImpl
: public ::grpc::testing::EchoTestService::ExperimentalCallbackService {
public:
CallbackTestServiceImpl() : signal_client_(false), host_() {}
- explicit CallbackTestServiceImpl(const TString& host)
- : signal_client_(false), host_(new TString(host)) {}
+ explicit CallbackTestServiceImpl(const TString& host)
+ : signal_client_(false), host_(new TString(host)) {}
experimental::ServerUnaryReactor* Echo(
experimental::CallbackServerContext* context, const EchoRequest* request,
EchoResponse* response) override;
experimental::ServerUnaryReactor* CheckClientInitialMetadata(
- experimental::CallbackServerContext* context, const SimpleRequest42*,
- SimpleResponse42*) override;
+ experimental::CallbackServerContext* context, const SimpleRequest42*,
+ SimpleResponse42*) override;
experimental::ServerReadReactor<EchoRequest>* RequestStream(
experimental::CallbackServerContext* context,
@@ -476,19 +476,19 @@ class CallbackTestServiceImpl
std::unique_lock<std::mutex> lock(mu_);
return signal_client_;
}
- void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
- void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
+ void ClientWaitUntilRpcStarted() { signaller_.ClientWaitUntilRpcStarted(); }
+ void SignalServerToContinue() { signaller_.SignalServerToContinue(); }
private:
bool signal_client_;
std::mutex mu_;
- TestServiceSignaller signaller_;
- std::unique_ptr<TString> host_;
+ TestServiceSignaller signaller_;
+ std::unique_ptr<TString> host_;
};
-using TestServiceImpl =
- TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
-
+using TestServiceImpl =
+ TestMultipleServiceImpl<::grpc::testing::EchoTestService::Service>;
+
} // namespace testing
} // namespace grpc