diff options
author | dvshkurko <dvshkurko@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:45:51 +0300 |
commit | 321ee9bce31ec6e238be26dbcbe539cffa2c3309 (patch) | |
tree | 14407a2757cbf29eb97e266b7f07e851f971000c /contrib/libs/grpc/test/cpp/end2end/end2end_test.cc | |
parent | 2f6ca198245aeffd5e2d82b65927c2465b68b4f5 (diff) | |
download | ydb-321ee9bce31ec6e238be26dbcbe539cffa2c3309.tar.gz |
Restoring authorship annotation for <dvshkurko@yandex-team.ru>. Commit 1 of 2.
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/end2end_test.cc')
-rw-r--r-- | contrib/libs/grpc/test/cpp/end2end/end2end_test.cc | 4348 |
1 files changed, 2174 insertions, 2174 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/end2end_test.cc b/contrib/libs/grpc/test/cpp/end2end/end2end_test.cc index ad2ddb7e84..9e1ae49b8d 100644 --- a/contrib/libs/grpc/test/cpp/end2end/end2end_test.cc +++ b/contrib/libs/grpc/test/cpp/end2end/end2end_test.cc @@ -1,95 +1,95 @@ -/* - * - * Copyright 2015 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -#include <grpc/grpc.h> -#include <grpc/support/alloc.h> -#include <grpc/support/log.h> -#include <grpc/support/time.h> -#include <grpcpp/channel.h> -#include <grpcpp/client_context.h> -#include <grpcpp/create_channel.h> -#include <grpcpp/impl/codegen/status_code_enum.h> -#include <grpcpp/resource_quota.h> -#include <grpcpp/security/auth_metadata_processor.h> -#include <grpcpp/security/credentials.h> -#include <grpcpp/security/server_credentials.h> -#include <grpcpp/server.h> -#include <grpcpp/server_builder.h> -#include <grpcpp/server_context.h> +/* + * + * Copyright 2015 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/log.h> +#include <grpc/support/time.h> +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/impl/codegen/status_code_enum.h> +#include <grpcpp/resource_quota.h> +#include <grpcpp/security/auth_metadata_processor.h> +#include <grpcpp/security/credentials.h> +#include <grpcpp/security/server_credentials.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <grpcpp/server_context.h> #include <grpcpp/support/string_ref.h> #include <grpcpp/test/channel_test_peer.h> - -#include <mutex> -#include <thread> - + +#include <mutex> +#include <thread> + #include "y_absl/strings/str_format.h" -#include "src/core/ext/filters/client_channel/backup_poller.h" -#include "src/core/lib/gpr/env.h" -#include "src/core/lib/iomgr/iomgr.h" -#include "src/core/lib/security/credentials/credentials.h" -#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" -#include "src/proto/grpc/testing/echo.grpc.pb.h" -#include "test/core/util/port.h" -#include "test/core/util/test_config.h" -#include "test/cpp/end2end/interceptors_util.h" -#include "test/cpp/end2end/test_service_impl.h" -#include "test/cpp/util/string_ref_helper.h" -#include "test/cpp/util/test_credentials_provider.h" - -#ifdef GRPC_POSIX_SOCKET_EV -#include "src/core/lib/iomgr/ev_posix.h" -#endif // GRPC_POSIX_SOCKET_EV - -#include <gtest/gtest.h> - -using grpc::testing::EchoRequest; -using grpc::testing::EchoResponse; -using grpc::testing::kTlsCredentialsType; -using std::chrono::system_clock; - -// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration -// should be skipped based on a decision made at SetUp time. In particular, -// tests that use the callback server can only be run if the iomgr can run in -// the background or if the transport is in-process. -#define MAYBE_SKIP_TEST \ - do { \ - if (do_not_test_) { \ - return; \ - } \ - } while (0) - -namespace grpc { -namespace testing { -namespace { - +#include "src/core/ext/filters/client_channel/backup_poller.h" +#include "src/core/lib/gpr/env.h" +#include "src/core/lib/iomgr/iomgr.h" +#include "src/core/lib/security/credentials/credentials.h" +#include "src/proto/grpc/testing/duplicate/echo_duplicate.grpc.pb.h" +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/interceptors_util.h" +#include "test/cpp/end2end/test_service_impl.h" +#include "test/cpp/util/string_ref_helper.h" +#include "test/cpp/util/test_credentials_provider.h" + +#ifdef GRPC_POSIX_SOCKET_EV +#include "src/core/lib/iomgr/ev_posix.h" +#endif // GRPC_POSIX_SOCKET_EV + +#include <gtest/gtest.h> + +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using grpc::testing::kTlsCredentialsType; +using std::chrono::system_clock; + +// MAYBE_SKIP_TEST is a macro to determine if this particular test configuration +// should be skipped based on a decision made at SetUp time. In particular, +// tests that use the callback server can only be run if the iomgr can run in +// the background or if the transport is in-process. +#define MAYBE_SKIP_TEST \ + do { \ + if (do_not_test_) { \ + return; \ + } \ + } while (0) + +namespace grpc { +namespace testing { +namespace { + bool CheckIsLocalhost(const TString& addr) { const TString kIpv6("ipv6:[::1]:"); const TString kIpv4MappedIpv6("ipv6:[::ffff:127.0.0.1]:"); const TString kIpv4("ipv4:127.0.0.1:"); - return addr.substr(0, kIpv4.size()) == kIpv4 || - addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 || - addr.substr(0, kIpv6.size()) == kIpv6; -} - + return addr.substr(0, kIpv4.size()) == kIpv4 || + addr.substr(0, kIpv4MappedIpv6.size()) == kIpv4MappedIpv6 || + addr.substr(0, kIpv6.size()) == kIpv6; +} + const int kClientChannelBackupPollIntervalMs = 200; -const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata."; - +const char kTestCredsPluginErrorMsg[] = "Could not find plugin metadata."; + const char kFakeToken[] = "fake_token"; const char kFakeSelector[] = "fake_selector"; const char kExpectedFakeCredsDebugString[] = @@ -142,738 +142,738 @@ const char kExpectedCompositeCallCredsDebugString[] = "key:call-creds-key1,value:call-creds-val1},TestMetadataCredentials{key:" "call-creds-key2,value:call-creds-val2}}}"; -class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin { - public: - static const char kGoodMetadataKey[]; - static const char kBadMetadataKey[]; - - TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key, - const grpc::string_ref& metadata_value, - bool is_blocking, bool is_successful, - int delay_ms) - : metadata_key_(metadata_key.data(), metadata_key.length()), - metadata_value_(metadata_value.data(), metadata_value.length()), - is_blocking_(is_blocking), - is_successful_(is_successful), - delay_ms_(delay_ms) {} - - bool IsBlocking() const override { return is_blocking_; } - - Status GetMetadata( - grpc::string_ref service_url, grpc::string_ref method_name, - const grpc::AuthContext& channel_auth_context, +class TestMetadataCredentialsPlugin : public MetadataCredentialsPlugin { + public: + static const char kGoodMetadataKey[]; + static const char kBadMetadataKey[]; + + TestMetadataCredentialsPlugin(const grpc::string_ref& metadata_key, + const grpc::string_ref& metadata_value, + bool is_blocking, bool is_successful, + int delay_ms) + : metadata_key_(metadata_key.data(), metadata_key.length()), + metadata_value_(metadata_value.data(), metadata_value.length()), + is_blocking_(is_blocking), + is_successful_(is_successful), + delay_ms_(delay_ms) {} + + bool IsBlocking() const override { return is_blocking_; } + + Status GetMetadata( + grpc::string_ref service_url, grpc::string_ref method_name, + const grpc::AuthContext& channel_auth_context, std::multimap<TString, TString>* metadata) override { - if (delay_ms_ != 0) { - gpr_sleep_until( - gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(delay_ms_, GPR_TIMESPAN))); - } - EXPECT_GT(service_url.length(), 0UL); - EXPECT_GT(method_name.length(), 0UL); - EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated()); - EXPECT_TRUE(metadata != nullptr); - if (is_successful_) { - metadata->insert(std::make_pair(metadata_key_, metadata_value_)); - return Status::OK; - } else { - return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg); - } - } - + if (delay_ms_ != 0) { + gpr_sleep_until( + gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay_ms_, GPR_TIMESPAN))); + } + EXPECT_GT(service_url.length(), 0UL); + EXPECT_GT(method_name.length(), 0UL); + EXPECT_TRUE(channel_auth_context.IsPeerAuthenticated()); + EXPECT_TRUE(metadata != nullptr); + if (is_successful_) { + metadata->insert(std::make_pair(metadata_key_, metadata_value_)); + return Status::OK; + } else { + return Status(StatusCode::NOT_FOUND, kTestCredsPluginErrorMsg); + } + } + TString DebugString() override { return y_absl::StrFormat("TestMetadataCredentials{key:%s,value:%s}", metadata_key_.c_str(), metadata_value_.c_str()); } - private: + private: TString metadata_key_; TString metadata_value_; - bool is_blocking_; - bool is_successful_; - int delay_ms_; -}; - -const char TestMetadataCredentialsPlugin::kBadMetadataKey[] = - "TestPluginMetadata"; -const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] = - "test-plugin-metadata"; - -class TestAuthMetadataProcessor : public AuthMetadataProcessor { - public: - static const char kGoodGuy[]; - - TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {} - - std::shared_ptr<CallCredentials> GetCompatibleClientCreds() { - return grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy, - is_blocking_, true, 0))); - } - - std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() { - return grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde", - is_blocking_, true, 0))); - } - - // Interface implementation - bool IsBlocking() const override { return is_blocking_; } - - Status Process(const InputMetadata& auth_metadata, AuthContext* context, - OutputMetadata* consumed_auth_metadata, - OutputMetadata* response_metadata) override { - EXPECT_TRUE(consumed_auth_metadata != nullptr); - EXPECT_TRUE(context != nullptr); - EXPECT_TRUE(response_metadata != nullptr); - auto auth_md = - auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey); - EXPECT_NE(auth_md, auth_metadata.end()); - string_ref auth_md_value = auth_md->second; - if (auth_md_value == kGoodGuy) { - context->AddProperty(kIdentityPropName, kGoodGuy); - context->SetPeerIdentityPropertyName(kIdentityPropName); - consumed_auth_metadata->insert(std::make_pair( - string(auth_md->first.data(), auth_md->first.length()), - string(auth_md->second.data(), auth_md->second.length()))); - return Status::OK; - } else { - return Status(StatusCode::UNAUTHENTICATED, - string("Invalid principal: ") + - string(auth_md_value.data(), auth_md_value.length())); - } - } - - private: - static const char kIdentityPropName[]; - bool is_blocking_; -}; - -const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll"; -const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity"; - -class Proxy : public ::grpc::testing::EchoTestService::Service { - public: - Proxy(const std::shared_ptr<Channel>& channel) - : stub_(grpc::testing::EchoTestService::NewStub(channel)) {} - - Status Echo(ServerContext* server_context, const EchoRequest* request, - EchoResponse* response) override { - std::unique_ptr<ClientContext> client_context = - ClientContext::FromServerContext(*server_context); - return stub_->Echo(client_context.get(), *request, response); - } - - private: - std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_; -}; - -class TestServiceImplDupPkg - : public ::grpc::testing::duplicate::EchoTestService::Service { - public: - Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/, - EchoResponse* response) override { - response->set_message("no package"); - return Status::OK; - } -}; - -class TestScenario { - public: - TestScenario(bool interceptors, bool proxy, bool inproc_stub, + bool is_blocking_; + bool is_successful_; + int delay_ms_; +}; + +const char TestMetadataCredentialsPlugin::kBadMetadataKey[] = + "TestPluginMetadata"; +const char TestMetadataCredentialsPlugin::kGoodMetadataKey[] = + "test-plugin-metadata"; + +class TestAuthMetadataProcessor : public AuthMetadataProcessor { + public: + static const char kGoodGuy[]; + + TestAuthMetadataProcessor(bool is_blocking) : is_blocking_(is_blocking) {} + + std::shared_ptr<CallCredentials> GetCompatibleClientCreds() { + return grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kGoodMetadataKey, kGoodGuy, + is_blocking_, true, 0))); + } + + std::shared_ptr<CallCredentials> GetIncompatibleClientCreds() { + return grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kGoodMetadataKey, "Mr Hyde", + is_blocking_, true, 0))); + } + + // Interface implementation + bool IsBlocking() const override { return is_blocking_; } + + Status Process(const InputMetadata& auth_metadata, AuthContext* context, + OutputMetadata* consumed_auth_metadata, + OutputMetadata* response_metadata) override { + EXPECT_TRUE(consumed_auth_metadata != nullptr); + EXPECT_TRUE(context != nullptr); + EXPECT_TRUE(response_metadata != nullptr); + auto auth_md = + auth_metadata.find(TestMetadataCredentialsPlugin::kGoodMetadataKey); + EXPECT_NE(auth_md, auth_metadata.end()); + string_ref auth_md_value = auth_md->second; + if (auth_md_value == kGoodGuy) { + context->AddProperty(kIdentityPropName, kGoodGuy); + context->SetPeerIdentityPropertyName(kIdentityPropName); + consumed_auth_metadata->insert(std::make_pair( + string(auth_md->first.data(), auth_md->first.length()), + string(auth_md->second.data(), auth_md->second.length()))); + return Status::OK; + } else { + return Status(StatusCode::UNAUTHENTICATED, + string("Invalid principal: ") + + string(auth_md_value.data(), auth_md_value.length())); + } + } + + private: + static const char kIdentityPropName[]; + bool is_blocking_; +}; + +const char TestAuthMetadataProcessor::kGoodGuy[] = "Dr Jekyll"; +const char TestAuthMetadataProcessor::kIdentityPropName[] = "novel identity"; + +class Proxy : public ::grpc::testing::EchoTestService::Service { + public: + Proxy(const std::shared_ptr<Channel>& channel) + : stub_(grpc::testing::EchoTestService::NewStub(channel)) {} + + Status Echo(ServerContext* server_context, const EchoRequest* request, + EchoResponse* response) override { + std::unique_ptr<ClientContext> client_context = + ClientContext::FromServerContext(*server_context); + return stub_->Echo(client_context.get(), *request, response); + } + + private: + std::unique_ptr<::grpc::testing::EchoTestService::Stub> stub_; +}; + +class TestServiceImplDupPkg + : public ::grpc::testing::duplicate::EchoTestService::Service { + public: + Status Echo(ServerContext* /*context*/, const EchoRequest* /*request*/, + EchoResponse* response) override { + response->set_message("no package"); + return Status::OK; + } +}; + +class TestScenario { + public: + TestScenario(bool interceptors, bool proxy, bool inproc_stub, const TString& creds_type, bool use_callback_server) - : use_interceptors(interceptors), - use_proxy(proxy), - inproc(inproc_stub), - credentials_type(creds_type), - callback_server(use_callback_server) {} - void Log() const; - bool use_interceptors; - bool use_proxy; - bool inproc; + : use_interceptors(interceptors), + use_proxy(proxy), + inproc(inproc_stub), + credentials_type(creds_type), + callback_server(use_callback_server) {} + void Log() const; + bool use_interceptors; + bool use_proxy; + bool inproc; const TString credentials_type; - bool callback_server; -}; - -static std::ostream& operator<<(std::ostream& out, - const TestScenario& scenario) { - return out << "TestScenario{use_interceptors=" - << (scenario.use_interceptors ? "true" : "false") - << ", use_proxy=" << (scenario.use_proxy ? "true" : "false") - << ", inproc=" << (scenario.inproc ? "true" : "false") - << ", server_type=" - << (scenario.callback_server ? "callback" : "sync") - << ", credentials='" << scenario.credentials_type << "'}"; -} - -void TestScenario::Log() const { - std::ostringstream out; - out << *this; - gpr_log(GPR_DEBUG, "%s", out.str().c_str()); -} - -class End2endTest : public ::testing::TestWithParam<TestScenario> { - protected: - static void SetUpTestCase() { grpc_init(); } - static void TearDownTestCase() { grpc_shutdown(); } - End2endTest() - : is_server_started_(false), - kMaxMessageSize_(8192), - special_service_("special"), - first_picked_port_(0) { - GetParam().Log(); - } - - void SetUp() override { - if (GetParam().callback_server && !GetParam().inproc && - !grpc_iomgr_run_in_background()) { - do_not_test_ = true; - return; - } - } - - void TearDown() override { - if (is_server_started_) { - server_->Shutdown(); - if (proxy_server_) proxy_server_->Shutdown(); - } - if (first_picked_port_ > 0) { - grpc_recycle_unused_port(first_picked_port_); - } - } - - void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { - int port = grpc_pick_unused_port_or_die(); - first_picked_port_ = port; - server_address_ << "127.0.0.1:" << port; - // Setup server - BuildAndStartServer(processor); - } - - void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { - if (is_server_started_) { - server_->Shutdown(); - BuildAndStartServer(processor); - } - } - - void BuildAndStartServer( - const std::shared_ptr<AuthMetadataProcessor>& processor) { - ServerBuilder builder; - ConfigureServerBuilder(&builder); - auto server_creds = GetCredentialsProvider()->GetServerCredentials( - GetParam().credentials_type); - if (GetParam().credentials_type != kInsecureCredentialsType) { - server_creds->SetAuthMetadataProcessor(processor); - } - if (GetParam().use_interceptors) { - std::vector< - std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> - creators; - // Add 20 dummy server interceptors - creators.reserve(20); - for (auto i = 0; i < 20; i++) { - creators.push_back(std::unique_ptr<DummyInterceptorFactory>( - new DummyInterceptorFactory())); - } - builder.experimental().SetInterceptorCreators(std::move(creators)); - } - builder.AddListeningPort(server_address_.str(), server_creds); - if (!GetParam().callback_server) { - builder.RegisterService(&service_); - } else { - builder.RegisterService(&callback_service_); - } - builder.RegisterService("foo.test.youtube.com", &special_service_); - builder.RegisterService(&dup_pkg_service_); - - builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); - builder.SetSyncServerOption( - ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); - - server_ = builder.BuildAndStart(); - is_server_started_ = true; - } - - virtual void ConfigureServerBuilder(ServerBuilder* builder) { - builder->SetMaxMessageSize( - kMaxMessageSize_); // For testing max message size. - } - - void ResetChannel( - std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> - interceptor_creators = {}) { - if (!is_server_started_) { - StartServer(std::shared_ptr<AuthMetadataProcessor>()); - } - EXPECT_TRUE(is_server_started_); - ChannelArguments args; - auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( - GetParam().credentials_type, &args); - if (!user_agent_prefix_.empty()) { - args.SetUserAgentPrefix(user_agent_prefix_); - } - args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); - - if (!GetParam().inproc) { - if (!GetParam().use_interceptors) { - channel_ = ::grpc::CreateCustomChannel(server_address_.str(), - channel_creds, args); - } else { - channel_ = CreateCustomChannelWithInterceptors( - server_address_.str(), channel_creds, args, - interceptor_creators.empty() ? CreateDummyClientInterceptors() - : std::move(interceptor_creators)); - } - } else { - if (!GetParam().use_interceptors) { - channel_ = server_->InProcessChannel(args); - } else { - channel_ = server_->experimental().InProcessChannelWithInterceptors( - args, interceptor_creators.empty() - ? CreateDummyClientInterceptors() - : std::move(interceptor_creators)); - } - } - } - - void ResetStub( - std::vector< - std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> - interceptor_creators = {}) { - ResetChannel(std::move(interceptor_creators)); - if (GetParam().use_proxy) { - proxy_service_.reset(new Proxy(channel_)); - int port = grpc_pick_unused_port_or_die(); - std::ostringstream proxyaddr; - proxyaddr << "localhost:" << port; - ServerBuilder builder; - builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); - builder.RegisterService(proxy_service_.get()); - - builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); - builder.SetSyncServerOption( - ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); - - proxy_server_ = builder.BuildAndStart(); - - channel_ = - grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); - } - - stub_ = grpc::testing::EchoTestService::NewStub(channel_); - DummyInterceptor::Reset(); - } - - bool do_not_test_{false}; - bool is_server_started_; - std::shared_ptr<Channel> channel_; - std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; - std::unique_ptr<Server> server_; - std::unique_ptr<Server> proxy_server_; - std::unique_ptr<Proxy> proxy_service_; - std::ostringstream server_address_; - const int kMaxMessageSize_; - TestServiceImpl service_; - CallbackTestServiceImpl callback_service_; - TestServiceImpl special_service_; - TestServiceImplDupPkg dup_pkg_service_; + bool callback_server; +}; + +static std::ostream& operator<<(std::ostream& out, + const TestScenario& scenario) { + return out << "TestScenario{use_interceptors=" + << (scenario.use_interceptors ? "true" : "false") + << ", use_proxy=" << (scenario.use_proxy ? "true" : "false") + << ", inproc=" << (scenario.inproc ? "true" : "false") + << ", server_type=" + << (scenario.callback_server ? "callback" : "sync") + << ", credentials='" << scenario.credentials_type << "'}"; +} + +void TestScenario::Log() const { + std::ostringstream out; + out << *this; + gpr_log(GPR_DEBUG, "%s", out.str().c_str()); +} + +class End2endTest : public ::testing::TestWithParam<TestScenario> { + protected: + static void SetUpTestCase() { grpc_init(); } + static void TearDownTestCase() { grpc_shutdown(); } + End2endTest() + : is_server_started_(false), + kMaxMessageSize_(8192), + special_service_("special"), + first_picked_port_(0) { + GetParam().Log(); + } + + void SetUp() override { + if (GetParam().callback_server && !GetParam().inproc && + !grpc_iomgr_run_in_background()) { + do_not_test_ = true; + return; + } + } + + void TearDown() override { + if (is_server_started_) { + server_->Shutdown(); + if (proxy_server_) proxy_server_->Shutdown(); + } + if (first_picked_port_ > 0) { + grpc_recycle_unused_port(first_picked_port_); + } + } + + void StartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { + int port = grpc_pick_unused_port_or_die(); + first_picked_port_ = port; + server_address_ << "127.0.0.1:" << port; + // Setup server + BuildAndStartServer(processor); + } + + void RestartServer(const std::shared_ptr<AuthMetadataProcessor>& processor) { + if (is_server_started_) { + server_->Shutdown(); + BuildAndStartServer(processor); + } + } + + void BuildAndStartServer( + const std::shared_ptr<AuthMetadataProcessor>& processor) { + ServerBuilder builder; + ConfigureServerBuilder(&builder); + auto server_creds = GetCredentialsProvider()->GetServerCredentials( + GetParam().credentials_type); + if (GetParam().credentials_type != kInsecureCredentialsType) { + server_creds->SetAuthMetadataProcessor(processor); + } + if (GetParam().use_interceptors) { + std::vector< + std::unique_ptr<experimental::ServerInterceptorFactoryInterface>> + creators; + // Add 20 dummy server interceptors + creators.reserve(20); + for (auto i = 0; i < 20; i++) { + creators.push_back(std::unique_ptr<DummyInterceptorFactory>( + new DummyInterceptorFactory())); + } + builder.experimental().SetInterceptorCreators(std::move(creators)); + } + builder.AddListeningPort(server_address_.str(), server_creds); + if (!GetParam().callback_server) { + builder.RegisterService(&service_); + } else { + builder.RegisterService(&callback_service_); + } + builder.RegisterService("foo.test.youtube.com", &special_service_); + builder.RegisterService(&dup_pkg_service_); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + + server_ = builder.BuildAndStart(); + is_server_started_ = true; + } + + virtual void ConfigureServerBuilder(ServerBuilder* builder) { + builder->SetMaxMessageSize( + kMaxMessageSize_); // For testing max message size. + } + + void ResetChannel( + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + interceptor_creators = {}) { + if (!is_server_started_) { + StartServer(std::shared_ptr<AuthMetadataProcessor>()); + } + EXPECT_TRUE(is_server_started_); + ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); + if (!user_agent_prefix_.empty()) { + args.SetUserAgentPrefix(user_agent_prefix_); + } + args.SetString(GRPC_ARG_SECONDARY_USER_AGENT_STRING, "end2end_test"); + + if (!GetParam().inproc) { + if (!GetParam().use_interceptors) { + channel_ = ::grpc::CreateCustomChannel(server_address_.str(), + channel_creds, args); + } else { + channel_ = CreateCustomChannelWithInterceptors( + server_address_.str(), channel_creds, args, + interceptor_creators.empty() ? CreateDummyClientInterceptors() + : std::move(interceptor_creators)); + } + } else { + if (!GetParam().use_interceptors) { + channel_ = server_->InProcessChannel(args); + } else { + channel_ = server_->experimental().InProcessChannelWithInterceptors( + args, interceptor_creators.empty() + ? CreateDummyClientInterceptors() + : std::move(interceptor_creators)); + } + } + } + + void ResetStub( + std::vector< + std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + interceptor_creators = {}) { + ResetChannel(std::move(interceptor_creators)); + if (GetParam().use_proxy) { + proxy_service_.reset(new Proxy(channel_)); + int port = grpc_pick_unused_port_or_die(); + std::ostringstream proxyaddr; + proxyaddr << "localhost:" << port; + ServerBuilder builder; + builder.AddListeningPort(proxyaddr.str(), InsecureServerCredentials()); + builder.RegisterService(proxy_service_.get()); + + builder.SetSyncServerOption(ServerBuilder::SyncServerOption::NUM_CQS, 4); + builder.SetSyncServerOption( + ServerBuilder::SyncServerOption::CQ_TIMEOUT_MSEC, 10); + + proxy_server_ = builder.BuildAndStart(); + + channel_ = + grpc::CreateChannel(proxyaddr.str(), InsecureChannelCredentials()); + } + + stub_ = grpc::testing::EchoTestService::NewStub(channel_); + DummyInterceptor::Reset(); + } + + bool do_not_test_{false}; + bool is_server_started_; + std::shared_ptr<Channel> channel_; + std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_; + std::unique_ptr<Server> server_; + std::unique_ptr<Server> proxy_server_; + std::unique_ptr<Proxy> proxy_service_; + std::ostringstream server_address_; + const int kMaxMessageSize_; + TestServiceImpl service_; + CallbackTestServiceImpl callback_service_; + TestServiceImpl special_service_; + TestServiceImplDupPkg dup_pkg_service_; TString user_agent_prefix_; - int first_picked_port_; -}; - -static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, - bool with_binary_metadata) { - EchoRequest request; - EchoResponse response; - request.set_message("Hello hello hello hello"); - - for (int i = 0; i < num_rpcs; ++i) { - ClientContext context; - if (with_binary_metadata) { - char bytes[8] = {'\0', '\1', '\2', '\3', - '\4', '\5', '\6', static_cast<char>(i)}; + int first_picked_port_; +}; + +static void SendRpc(grpc::testing::EchoTestService::Stub* stub, int num_rpcs, + bool with_binary_metadata) { + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + + for (int i = 0; i < num_rpcs; ++i) { + ClientContext context; + if (with_binary_metadata) { + char bytes[8] = {'\0', '\1', '\2', '\3', + '\4', '\5', '\6', static_cast<char>(i)}; context.AddMetadata("custom-bin", TString(bytes, 8)); - } - context.set_compression_algorithm(GRPC_COMPRESS_GZIP); - Status s = stub->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - } -} - -// This class is for testing scenarios where RPCs are cancelled on the server -// by calling ServerContext::TryCancel() -class End2endServerTryCancelTest : public End2endTest { - protected: - // Helper for testing client-streaming RPCs which are cancelled on the server. - // Depending on the value of server_try_cancel parameter, this will test one - // of the following three scenarios: - // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading - // any messages from the client - // - // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading - // messages from the client - // - // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all - // the messages from the client - // - // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. - void TestRequestStreamServerCancel( - ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { - MAYBE_SKIP_TEST; - RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - // Send server_try_cancel value in the client metadata - context.AddMetadata(kServerTryCancelRequest, + } + context.set_compression_algorithm(GRPC_COMPRESS_GZIP); + Status s = stub->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + } +} + +// This class is for testing scenarios where RPCs are cancelled on the server +// by calling ServerContext::TryCancel() +class End2endServerTryCancelTest : public End2endTest { + protected: + // Helper for testing client-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading + // any messages from the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading + // messages from the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading all + // the messages from the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestRequestStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel, int num_msgs_to_send) { + MAYBE_SKIP_TEST; + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel value in the client metadata + context.AddMetadata(kServerTryCancelRequest, ToString(server_try_cancel)); - - auto stream = stub_->RequestStream(&context, &response); - - int num_msgs_sent = 0; - while (num_msgs_sent < num_msgs_to_send) { - request.set_message("hello"); - if (!stream->Write(request)) { - break; - } - num_msgs_sent++; - } - gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); - - stream->WritesDone(); - Status s = stream->Finish(); - - // At this point, we know for sure that RPC was cancelled by the server - // since we passed server_try_cancel value in the metadata. Depending on the - // value of server_try_cancel, the RPC might have been cancelled by the - // server at different stages. The following validates our expectations of - // number of messages sent in various cancellation scenarios: - - switch (server_try_cancel) { - case CANCEL_BEFORE_PROCESSING: - case CANCEL_DURING_PROCESSING: - // If the RPC is cancelled by server before / during messages from the - // client, it means that the client most likely did not get a chance to - // send all the messages it wanted to send. i.e num_msgs_sent <= - // num_msgs_to_send - EXPECT_LE(num_msgs_sent, num_msgs_to_send); - break; - - case CANCEL_AFTER_PROCESSING: - // If the RPC was cancelled after all messages were read by the server, - // the client did get a chance to send all its messages - EXPECT_EQ(num_msgs_sent, num_msgs_to_send); - break; - - default: - gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", - server_try_cancel); - EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && - server_try_cancel <= CANCEL_AFTER_PROCESSING); - break; - } - - EXPECT_FALSE(s.ok()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - // Make sure that the server interceptors were notified - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } - } - - // Helper for testing server-streaming RPCs which are cancelled on the server. - // Depending on the value of server_try_cancel parameter, this will test one - // of the following three scenarios: - // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing - // any messages to the client - // - // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing - // messages to the client - // - // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all - // the messages to the client - // - // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. - void TestResponseStreamServerCancel( - ServerTryCancelRequestPhase server_try_cancel) { - MAYBE_SKIP_TEST; - RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - // Send server_try_cancel in the client metadata - context.AddMetadata(kServerTryCancelRequest, + + auto stream = stub_->RequestStream(&context, &response); + + int num_msgs_sent = 0; + while (num_msgs_sent < num_msgs_to_send) { + request.set_message("hello"); + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + + stream->WritesDone(); + Status s = stream->Finish(); + + // At this point, we know for sure that RPC was cancelled by the server + // since we passed server_try_cancel value in the metadata. Depending on the + // value of server_try_cancel, the RPC might have been cancelled by the + // server at different stages. The following validates our expectations of + // number of messages sent in various cancellation scenarios: + + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + case CANCEL_DURING_PROCESSING: + // If the RPC is cancelled by server before / during messages from the + // client, it means that the client most likely did not get a chance to + // send all the messages it wanted to send. i.e num_msgs_sent <= + // num_msgs_to_send + EXPECT_LE(num_msgs_sent, num_msgs_to_send); + break; + + case CANCEL_AFTER_PROCESSING: + // If the RPC was cancelled after all messages were read by the server, + // the client did get a chance to send all its messages + EXPECT_EQ(num_msgs_sent, num_msgs_to_send); + break; + + default: + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } + } + + // Helper for testing server-streaming RPCs which are cancelled on the server. + // Depending on the value of server_try_cancel parameter, this will test one + // of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before writing + // any messages to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while writing + // messages to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after writing all + // the messages to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestResponseStreamServerCancel( + ServerTryCancelRequestPhase server_try_cancel) { + MAYBE_SKIP_TEST; + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel in the client metadata + context.AddMetadata(kServerTryCancelRequest, ToString(server_try_cancel)); - - request.set_message("hello"); - auto stream = stub_->ResponseStream(&context, request); - - int num_msgs_read = 0; - while (num_msgs_read < kServerDefaultResponseStreamsToSend) { - if (!stream->Read(&response)) { - break; - } - EXPECT_EQ(response.message(), + + request.set_message("hello"); + auto stream = stub_->ResponseStream(&context, request); + + int num_msgs_read = 0; + while (num_msgs_read < kServerDefaultResponseStreamsToSend) { + if (!stream->Read(&response)) { + break; + } + EXPECT_EQ(response.message(), request.message() + ToString(num_msgs_read)); - num_msgs_read++; - } - gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); - - Status s = stream->Finish(); - - // Depending on the value of server_try_cancel, the RPC might have been - // cancelled by the server at different stages. The following validates our - // expectations of number of messages read in various cancellation - // scenarios: - switch (server_try_cancel) { - case CANCEL_BEFORE_PROCESSING: - // Server cancelled before sending any messages. Which means the client - // wouldn't have read any - EXPECT_EQ(num_msgs_read, 0); - break; - - case CANCEL_DURING_PROCESSING: - // Server cancelled while writing messages. Client must have read less - // than or equal to the expected number of messages - EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); - break; - - case CANCEL_AFTER_PROCESSING: - // Even though the Server cancelled after writing all messages, the RPC - // may be cancelled before the Client got a chance to read all the - // messages. - EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); - break; - - default: { - gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", - server_try_cancel); - EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && - server_try_cancel <= CANCEL_AFTER_PROCESSING); - break; - } - } - - EXPECT_FALSE(s.ok()); - // Make sure that the server interceptors were notified - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } - } - - // Helper for testing bidirectional-streaming RPCs which are cancelled on the - // server. Depending on the value of server_try_cancel parameter, this will - // test one of the following three scenarios: - // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ - // writing any messages from/to the client - // - // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/ - // writing messages from/to the client - // - // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing - // all the messages from/to the client - // - // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. - void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, - int num_messages) { - MAYBE_SKIP_TEST; - RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - // Send server_try_cancel in the client metadata - context.AddMetadata(kServerTryCancelRequest, + num_msgs_read++; + } + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + Status s = stream->Finish(); + + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + // Server cancelled before sending any messages. Which means the client + // wouldn't have read any + EXPECT_EQ(num_msgs_read, 0); + break; + + case CANCEL_DURING_PROCESSING: + // Server cancelled while writing messages. Client must have read less + // than or equal to the expected number of messages + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); + break; + + case CANCEL_AFTER_PROCESSING: + // Even though the Server cancelled after writing all messages, the RPC + // may be cancelled before the Client got a chance to read all the + // messages. + EXPECT_LE(num_msgs_read, kServerDefaultResponseStreamsToSend); + break; + + default: { + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + } + + EXPECT_FALSE(s.ok()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } + } + + // Helper for testing bidirectional-streaming RPCs which are cancelled on the + // server. Depending on the value of server_try_cancel parameter, this will + // test one of the following three scenarios: + // CANCEL_BEFORE_PROCESSING: Rpc is cancelled by the server before reading/ + // writing any messages from/to the client + // + // CANCEL_DURING_PROCESSING: Rpc is cancelled by the server while reading/ + // writing messages from/to the client + // + // CANCEL_AFTER PROCESSING: Rpc is cancelled by server after reading/writing + // all the messages from/to the client + // + // NOTE: Do not call this function with server_try_cancel == DO_NOT_CANCEL. + void TestBidiStreamServerCancel(ServerTryCancelRequestPhase server_try_cancel, + int num_messages) { + MAYBE_SKIP_TEST; + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + // Send server_try_cancel in the client metadata + context.AddMetadata(kServerTryCancelRequest, ToString(server_try_cancel)); - - auto stream = stub_->BidiStream(&context); - - int num_msgs_read = 0; - int num_msgs_sent = 0; - while (num_msgs_sent < num_messages) { + + auto stream = stub_->BidiStream(&context); + + int num_msgs_read = 0; + int num_msgs_sent = 0; + while (num_msgs_sent < num_messages) { request.set_message("hello " + ToString(num_msgs_sent)); - if (!stream->Write(request)) { - break; - } - num_msgs_sent++; - - if (!stream->Read(&response)) { - break; - } - num_msgs_read++; - - EXPECT_EQ(response.message(), request.message()); - } - gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); - gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); - - stream->WritesDone(); - Status s = stream->Finish(); - - // Depending on the value of server_try_cancel, the RPC might have been - // cancelled by the server at different stages. The following validates our - // expectations of number of messages read in various cancellation - // scenarios: - switch (server_try_cancel) { - case CANCEL_BEFORE_PROCESSING: - EXPECT_EQ(num_msgs_read, 0); - break; - - case CANCEL_DURING_PROCESSING: - EXPECT_LE(num_msgs_sent, num_messages); - EXPECT_LE(num_msgs_read, num_msgs_sent); - break; - - case CANCEL_AFTER_PROCESSING: - EXPECT_EQ(num_msgs_sent, num_messages); - - // The Server cancelled after reading the last message and after writing - // the message to the client. However, the RPC cancellation might have - // taken effect before the client actually read the response. - EXPECT_LE(num_msgs_read, num_msgs_sent); - break; - - default: - gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", - server_try_cancel); - EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && - server_try_cancel <= CANCEL_AFTER_PROCESSING); - break; - } - - EXPECT_FALSE(s.ok()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - // Make sure that the server interceptors were notified - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } - } -}; - -TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - context.AddMetadata(kServerTryCancelRequest, + if (!stream->Write(request)) { + break; + } + num_msgs_sent++; + + if (!stream->Read(&response)) { + break; + } + num_msgs_read++; + + EXPECT_EQ(response.message(), request.message()); + } + gpr_log(GPR_INFO, "Sent %d messages", num_msgs_sent); + gpr_log(GPR_INFO, "Read %d messages", num_msgs_read); + + stream->WritesDone(); + Status s = stream->Finish(); + + // Depending on the value of server_try_cancel, the RPC might have been + // cancelled by the server at different stages. The following validates our + // expectations of number of messages read in various cancellation + // scenarios: + switch (server_try_cancel) { + case CANCEL_BEFORE_PROCESSING: + EXPECT_EQ(num_msgs_read, 0); + break; + + case CANCEL_DURING_PROCESSING: + EXPECT_LE(num_msgs_sent, num_messages); + EXPECT_LE(num_msgs_read, num_msgs_sent); + break; + + case CANCEL_AFTER_PROCESSING: + EXPECT_EQ(num_msgs_sent, num_messages); + + // The Server cancelled after reading the last message and after writing + // the message to the client. However, the RPC cancellation might have + // taken effect before the client actually read the response. + EXPECT_LE(num_msgs_read, num_msgs_sent); + break; + + default: + gpr_log(GPR_ERROR, "Invalid server_try_cancel value: %d", + server_try_cancel); + EXPECT_TRUE(server_try_cancel > DO_NOT_CANCEL && + server_try_cancel <= CANCEL_AFTER_PROCESSING); + break; + } + + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + // Make sure that the server interceptors were notified + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } + } +}; + +TEST_P(End2endServerTryCancelTest, RequestEchoServerCancel) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.AddMetadata(kServerTryCancelRequest, ToString(CANCEL_BEFORE_PROCESSING)); - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); -} - -// Server to cancel before doing reading the request -TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) { - TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1); -} - -// Server to cancel while reading a request from the stream in parallel -TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) { - TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10); -} - -// Server to cancel after reading all the requests but before returning to the -// client -TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) { - TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4); -} - -// Server to cancel before sending any response messages -TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) { - TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING); -} - -// Server to cancel while writing a response to the stream in parallel -TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) { - TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING); -} - -// Server to cancel after writing all the respones to the stream but before -// returning to the client -TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) { - TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING); -} - -// Server to cancel before reading/writing any requests/responses on the stream -TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) { - TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2); -} - -// Server to cancel while reading/writing requests/responses on the stream in -// parallel -TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) { - TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10); -} - -// Server to cancel after reading/writing all requests/responses on the stream -// but before returning to the client -TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { - TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); -} - -TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { - MAYBE_SKIP_TEST; - // User-Agent is an HTTP header for HTTP transports only - if (GetParam().inproc) { - return; - } - user_agent_prefix_ = "custom_prefix"; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello hello hello hello"); - request.mutable_param()->set_echo_metadata(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - const auto& trailing_metadata = context.GetServerTrailingMetadata(); - auto iter = trailing_metadata.find("user-agent"); - EXPECT_TRUE(iter != trailing_metadata.end()); + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); +} + +// Server to cancel before doing reading the request +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelBeforeReads) { + TestRequestStreamServerCancel(CANCEL_BEFORE_PROCESSING, 1); +} + +// Server to cancel while reading a request from the stream in parallel +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelDuringRead) { + TestRequestStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading all the requests but before returning to the +// client +TEST_P(End2endServerTryCancelTest, RequestStreamServerCancelAfterReads) { + TestRequestStreamServerCancel(CANCEL_AFTER_PROCESSING, 4); +} + +// Server to cancel before sending any response messages +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelBefore) { + TestResponseStreamServerCancel(CANCEL_BEFORE_PROCESSING); +} + +// Server to cancel while writing a response to the stream in parallel +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelDuring) { + TestResponseStreamServerCancel(CANCEL_DURING_PROCESSING); +} + +// Server to cancel after writing all the respones to the stream but before +// returning to the client +TEST_P(End2endServerTryCancelTest, ResponseStreamServerCancelAfter) { + TestResponseStreamServerCancel(CANCEL_AFTER_PROCESSING); +} + +// Server to cancel before reading/writing any requests/responses on the stream +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelBefore) { + TestBidiStreamServerCancel(CANCEL_BEFORE_PROCESSING, 2); +} + +// Server to cancel while reading/writing requests/responses on the stream in +// parallel +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelDuring) { + TestBidiStreamServerCancel(CANCEL_DURING_PROCESSING, 10); +} + +// Server to cancel after reading/writing all requests/responses on the stream +// but before returning to the client +TEST_P(End2endServerTryCancelTest, BidiStreamServerCancelAfter) { + TestBidiStreamServerCancel(CANCEL_AFTER_PROCESSING, 5); +} + +TEST_P(End2endTest, SimpleRpcWithCustomUserAgentPrefix) { + MAYBE_SKIP_TEST; + // User-Agent is an HTTP header for HTTP transports only + if (GetParam().inproc) { + return; + } + user_agent_prefix_ = "custom_prefix"; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + request.mutable_param()->set_echo_metadata(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + const auto& trailing_metadata = context.GetServerTrailingMetadata(); + auto iter = trailing_metadata.find("user-agent"); + EXPECT_TRUE(iter != trailing_metadata.end()); TString expected_prefix = user_agent_prefix_ + " grpc-c++/"; - EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second; -} - -TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { - MAYBE_SKIP_TEST; - ResetStub(); - std::vector<std::thread> threads; - threads.reserve(10); - for (int i = 0; i < 10; ++i) { - threads.emplace_back(SendRpc, stub_.get(), 10, true); - } - for (int i = 0; i < 10; ++i) { - threads[i].join(); - } -} - -TEST_P(End2endTest, MultipleRpcs) { - MAYBE_SKIP_TEST; - ResetStub(); - std::vector<std::thread> threads; - threads.reserve(10); - for (int i = 0; i < 10; ++i) { - threads.emplace_back(SendRpc, stub_.get(), 10, false); - } - for (int i = 0; i < 10; ++i) { - threads[i].join(); - } -} - + EXPECT_TRUE(iter->second.starts_with(expected_prefix)) << iter->second; +} + +TEST_P(End2endTest, MultipleRpcsWithVariedBinaryMetadataValue) { + MAYBE_SKIP_TEST; + ResetStub(); + std::vector<std::thread> threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back(SendRpc, stub_.get(), 10, true); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +TEST_P(End2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; + ResetStub(); + std::vector<std::thread> threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back(SendRpc, stub_.get(), 10, false); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + TEST_P(End2endTest, ManyStubs) { MAYBE_SKIP_TEST; ResetStub(); @@ -887,37 +887,37 @@ TEST_P(End2endTest, ManyStubs) { EXPECT_GT(peer.registration_attempts(), registration_attempts_pre); } -TEST_P(End2endTest, EmptyBinaryMetadata) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello hello hello hello"); - ClientContext context; - context.AddMetadata("custom-bin", ""); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, ReconnectChannel) { - MAYBE_SKIP_TEST; - if (GetParam().inproc) { - return; - } - int poller_slowdown_factor = 1; - // It needs 2 pollset_works to reconnect the channel with polling engine - // "poll" -#ifdef GRPC_POSIX_SOCKET_EV - grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); - if (0 == strcmp(poller.get(), "poll")) { - poller_slowdown_factor = 2; - } -#endif // GRPC_POSIX_SOCKET_EV - ResetStub(); - SendRpc(stub_.get(), 1, false); - RestartServer(std::shared_ptr<AuthMetadataProcessor>()); - // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to +TEST_P(End2endTest, EmptyBinaryMetadata) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello hello hello hello"); + ClientContext context; + context.AddMetadata("custom-bin", ""); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, ReconnectChannel) { + MAYBE_SKIP_TEST; + if (GetParam().inproc) { + return; + } + int poller_slowdown_factor = 1; + // It needs 2 pollset_works to reconnect the channel with polling engine + // "poll" +#ifdef GRPC_POSIX_SOCKET_EV + grpc_core::UniquePtr<char> poller = GPR_GLOBAL_CONFIG_GET(grpc_poll_strategy); + if (0 == strcmp(poller.get(), "poll")) { + poller_slowdown_factor = 2; + } +#endif // GRPC_POSIX_SOCKET_EV + ResetStub(); + SendRpc(stub_.get(), 1, false); + RestartServer(std::shared_ptr<AuthMetadataProcessor>()); + // It needs more than GRPC_CLIENT_CHANNEL_BACKUP_POLL_INTERVAL_MS time to // reconnect the channel. Make it a factor of 5x gpr_sleep_until( gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), @@ -925,309 +925,309 @@ TEST_P(End2endTest, ReconnectChannel) { poller_slowdown_factor * grpc_test_slowdown_factor(), GPR_TIMESPAN))); - SendRpc(stub_.get(), 1, false); -} - -TEST_P(End2endTest, RequestStreamOneRequest) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - EXPECT_TRUE(stream->Write(request)); - stream->WritesDone(); - Status s = stream->Finish(); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(context.debug_error_string().empty()); -} - -TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - context.set_initial_metadata_corked(true); - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - stream->WriteLast(request, WriteOptions()); - Status s = stream->Finish(); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, RequestStreamTwoRequests) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Write(request)); - stream->WritesDone(); - Status s = stream->Finish(); - EXPECT_EQ(response.message(), "hellohello"); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); - EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); - stream->WritesDone(); - Status s = stream->Finish(); - EXPECT_EQ(response.message(), "hellohello"); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - context.set_initial_metadata_corked(true); - auto stream = stub_->RequestStream(&context, &response); - request.set_message("hello"); - EXPECT_TRUE(stream->Write(request)); - stream->WriteLast(request, WriteOptions()); - Status s = stream->Finish(); - EXPECT_EQ(response.message(), "hellohello"); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, ResponseStream) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - - auto stream = stub_->ResponseStream(&context, request); - for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { - EXPECT_TRUE(stream->Read(&response)); + SendRpc(stub_.get(), 1, false); +} + +TEST_P(End2endTest, RequestStreamOneRequest) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(context.debug_error_string().empty()); +} + +TEST_P(End2endTest, RequestStreamOneRequestWithCoalescingApi) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, RequestStreamTwoRequests) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Write(request)); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, RequestStreamTwoRequestsWithWriteThrough) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + EXPECT_TRUE(stream->Write(request, WriteOptions().set_write_through())); + stream->WritesDone(); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, RequestStreamTwoRequestsWithCoalescingApi) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + context.set_initial_metadata_corked(true); + auto stream = stub_->RequestStream(&context, &response); + request.set_message("hello"); + EXPECT_TRUE(stream->Write(request)); + stream->WriteLast(request, WriteOptions()); + Status s = stream->Finish(); + EXPECT_EQ(response.message(), "hellohello"); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, ResponseStream) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message() + ToString(i)); - } - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - context.AddMetadata(kServerUseCoalescingApi, "1"); - - auto stream = stub_->ResponseStream(&context, request); - for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { - EXPECT_TRUE(stream->Read(&response)); + } + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, ResponseStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + + auto stream = stub_->ResponseStream(&context, request); + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + EXPECT_TRUE(stream->Read(&response)); EXPECT_EQ(response.message(), request.message() + ToString(i)); - } - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -// This was added to prevent regression from issue: -// https://github.com/grpc/grpc/issues/11546 -TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - context.AddMetadata(kServerUseCoalescingApi, "1"); - // We will only send one message, forcing everything (init metadata, message, - // trailing) to be coalesced together. - context.AddMetadata(kServerResponseStreamsToSend, "1"); - - auto stream = stub_->ResponseStream(&context, request); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "0"); - - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, BidiStream) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; + } + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, ResponseStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.AddMetadata(kServerUseCoalescingApi, "1"); + // We will only send one message, forcing everything (init metadata, message, + // trailing) to be coalesced together. + context.AddMetadata(kServerResponseStreamsToSend, "1"); + + auto stream = stub_->ResponseStream(&context, request); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, BidiStream) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; TString msg("hello"); - - auto stream = stub_->BidiStream(&context); - - for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { + + auto stream = stub_->BidiStream(&context); + + for (int i = 0; i < kServerDefaultResponseStreamsToSend; ++i) { request.set_message(msg + ToString(i)); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - } - - stream->WritesDone(); - EXPECT_FALSE(stream->Read(&response)); - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, BidiStreamWithCoalescingApi) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.AddMetadata(kServerFinishAfterNReads, "3"); - context.set_initial_metadata_corked(true); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + } + + stream->WritesDone(); + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, BidiStreamWithCoalescingApi) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "3"); + context.set_initial_metadata_corked(true); TString msg("hello"); - - auto stream = stub_->BidiStream(&context); - - request.set_message(msg + "0"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "1"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "2"); - stream->WriteLast(request, WriteOptions()); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - EXPECT_FALSE(stream->Read(&response)); - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -// This was added to prevent regression from issue: -// https://github.com/grpc/grpc/issues/11546 -TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.AddMetadata(kServerFinishAfterNReads, "1"); - context.set_initial_metadata_corked(true); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "2"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +// This was added to prevent regression from issue: +// https://github.com/grpc/grpc/issues/11546 +TEST_P(End2endTest, BidiStreamWithEverythingCoalesced) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.AddMetadata(kServerFinishAfterNReads, "1"); + context.set_initial_metadata_corked(true); TString msg("hello"); - - auto stream = stub_->BidiStream(&context); - - request.set_message(msg + "0"); - stream->WriteLast(request, WriteOptions()); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - EXPECT_FALSE(stream->Read(&response)); - EXPECT_FALSE(stream->Read(&response)); - - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -// Talk to the two services with the same name but different package names. -// The two stubs are created on the same channel. -TEST_P(End2endTest, DiffPackageServices) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - - std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub( - grpc::testing::duplicate::EchoTestService::NewStub(channel_)); - ClientContext context2; - s = dup_pkg_stub->Echo(&context2, request, &response); - EXPECT_EQ("no package", response.message()); - EXPECT_TRUE(s.ok()); -} - -template <class ServiceType> -void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_micros(delay_us, GPR_TIMESPAN))); - while (!service->signal_client()) { - } - context->TryCancel(); -} - -TEST_P(End2endTest, CancelRpcBeforeStart) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - context.TryCancel(); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ("", response.message()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } -} - + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + stream->WriteLast(request, WriteOptions()); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + EXPECT_FALSE(stream->Read(&response)); + EXPECT_FALSE(stream->Read(&response)); + + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +// Talk to the two services with the same name but different package names. +// The two stubs are created on the same channel. +TEST_P(End2endTest, DiffPackageServices) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + + std::unique_ptr<grpc::testing::duplicate::EchoTestService::Stub> dup_pkg_stub( + grpc::testing::duplicate::EchoTestService::NewStub(channel_)); + ClientContext context2; + s = dup_pkg_stub->Echo(&context2, request, &response); + EXPECT_EQ("no package", response.message()); + EXPECT_TRUE(s.ok()); +} + +template <class ServiceType> +void CancelRpc(ClientContext* context, int delay_us, ServiceType* service) { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_micros(delay_us, GPR_TIMESPAN))); + while (!service->signal_client()) { + } + context->TryCancel(); +} + +TEST_P(End2endTest, CancelRpcBeforeStart) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + context.TryCancel(); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ("", response.message()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + TEST_P(End2endTest, CancelRpcAfterStart) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); request.mutable_param()->set_server_notify_client_when_started(true); - request.mutable_param()->set_skip_cancelled_check(true); - Status s; - std::thread echo_thread([this, &s, &context, &request, &response] { - s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - }); + request.mutable_param()->set_skip_cancelled_check(true); + Status s; + std::thread echo_thread([this, &s, &context, &request, &response] { + s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + }); if (!GetParam().callback_server) { service_.ClientWaitUntilRpcStarted(); } else { callback_service_.ClientWaitUntilRpcStarted(); } - context.TryCancel(); + context.TryCancel(); if (!GetParam().callback_server) { service_.SignalServerToContinue(); @@ -1235,1123 +1235,1123 @@ TEST_P(End2endTest, CancelRpcAfterStart) { callback_service_.SignalServerToContinue(); } - echo_thread.join(); - EXPECT_EQ("", response.message()); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } -} - -// Client cancels request stream after sending two messages -TEST_P(End2endTest, ClientCancelsRequestStream) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - - auto stream = stub_->RequestStream(&context, &response); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Write(request)); - - context.TryCancel(); - - Status s = stream->Finish(); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - - EXPECT_EQ(response.message(), ""); - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } -} - -// Client cancels server stream after sending some messages -TEST_P(End2endTest, ClientCancelsResponseStream) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("hello"); - - auto stream = stub_->ResponseStream(&context, request); - - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "0"); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message() + "1"); - - context.TryCancel(); - - // The cancellation races with responses, so there might be zero or - // one responses pending, read till failure - - if (stream->Read(&response)) { - EXPECT_EQ(response.message(), request.message() + "2"); - // Since we have cancelled, we expect the next attempt to read to fail - EXPECT_FALSE(stream->Read(&response)); - } - - Status s = stream->Finish(); - // The final status could be either of CANCELLED or OK depending on - // who won the race. - EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code()); - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } -} - -// Client cancels bidi stream after sending some messages -TEST_P(End2endTest, ClientCancelsBidi) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; + echo_thread.join(); + EXPECT_EQ("", response.message()); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Client cancels request stream after sending two messages +TEST_P(End2endTest, ClientCancelsRequestStream) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->RequestStream(&context, &response); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Write(request)); + + context.TryCancel(); + + Status s = stream->Finish(); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + + EXPECT_EQ(response.message(), ""); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Client cancels server stream after sending some messages +TEST_P(End2endTest, ClientCancelsResponseStream) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("hello"); + + auto stream = stub_->ResponseStream(&context, request); + + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "0"); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message() + "1"); + + context.TryCancel(); + + // The cancellation races with responses, so there might be zero or + // one responses pending, read till failure + + if (stream->Read(&response)) { + EXPECT_EQ(response.message(), request.message() + "2"); + // Since we have cancelled, we expect the next attempt to read to fail + EXPECT_FALSE(stream->Read(&response)); + } + + Status s = stream->Finish(); + // The final status could be either of CANCELLED or OK depending on + // who won the race. + EXPECT_GE(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +// Client cancels bidi stream after sending some messages +TEST_P(End2endTest, ClientCancelsBidi) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; TString msg("hello"); - - auto stream = stub_->BidiStream(&context); - - request.set_message(msg + "0"); - EXPECT_TRUE(stream->Write(request)); - EXPECT_TRUE(stream->Read(&response)); - EXPECT_EQ(response.message(), request.message()); - - request.set_message(msg + "1"); - EXPECT_TRUE(stream->Write(request)); - - context.TryCancel(); - - // The cancellation races with responses, so there might be zero or - // one responses pending, read till failure - - if (stream->Read(&response)) { - EXPECT_EQ(response.message(), request.message()); - // Since we have cancelled, we expect the next attempt to read to fail - EXPECT_FALSE(stream->Read(&response)); - } - - Status s = stream->Finish(); - EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); - if (GetParam().use_interceptors) { - EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); - } -} - -TEST_P(End2endTest, RpcMaxMessageSize) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message(string(kMaxMessageSize_ * 2, 'a')); - request.mutable_param()->set_server_die(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); -} - -void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, - gpr_event* ev) { - EchoResponse resp; - gpr_event_set(ev, (void*)1); - while (stream->Read(&resp)) { - gpr_log(GPR_INFO, "Read message"); - } -} - -// Run a Read and a WritesDone simultaneously. -TEST_P(End2endTest, SimultaneousReadWritesDone) { - MAYBE_SKIP_TEST; - ResetStub(); - ClientContext context; - gpr_event ev; - gpr_event_init(&ev); - auto stream = stub_->BidiStream(&context); - std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); - gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); - stream->WritesDone(); - reader_thread.join(); - Status s = stream->Finish(); - EXPECT_TRUE(s.ok()); -} - -TEST_P(End2endTest, ChannelState) { - MAYBE_SKIP_TEST; - if (GetParam().inproc) { - return; - } - - ResetStub(); - // Start IDLE - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); - - // Did not ask to connect, no state change. - CompletionQueue cq; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(10); - channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr); - void* tag; - bool ok = true; - cq.Next(&tag, &ok); - EXPECT_FALSE(ok); - - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); - EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE, - gpr_inf_future(GPR_CLOCK_REALTIME))); - auto state = channel_->GetState(false); - EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY); -} - -// Takes 10s. -TEST_P(End2endTest, ChannelStateTimeout) { - if ((GetParam().credentials_type != kInsecureCredentialsType) || - GetParam().inproc) { - return; - } - int port = grpc_pick_unused_port_or_die(); - std::ostringstream server_address; - server_address << "127.0.0.1:" << port; - // Channel to non-existing server - auto channel = - grpc::CreateChannel(server_address.str(), InsecureChannelCredentials()); - // Start IDLE - EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true)); - - auto state = GRPC_CHANNEL_IDLE; - for (int i = 0; i < 10; i++) { - channel->WaitForStateChange( - state, std::chrono::system_clock::now() + std::chrono::seconds(1)); - state = channel->GetState(false); - } -} - -// Talking to a non-existing service. -TEST_P(End2endTest, NonExistingService) { - MAYBE_SKIP_TEST; - ResetChannel(); - std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub; - stub = grpc::testing::UnimplementedEchoService::NewStub(channel_); - - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - Status s = stub->Unimplemented(&context, request, &response); - EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code()); - EXPECT_EQ("", s.error_message()); -} - -// Ask the server to send back a serialized proto in trailer. -// This is an example of setting error details. -TEST_P(End2endTest, BinaryTrailerTest) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - - request.mutable_param()->set_echo_metadata(true); - DebugInfo* info = request.mutable_param()->mutable_debug_info(); - info->add_stack_entries("stack_entry_1"); - info->add_stack_entries("stack_entry_2"); - info->add_stack_entries("stack_entry_3"); - info->set_detail("detailed debug info"); + + auto stream = stub_->BidiStream(&context); + + request.set_message(msg + "0"); + EXPECT_TRUE(stream->Write(request)); + EXPECT_TRUE(stream->Read(&response)); + EXPECT_EQ(response.message(), request.message()); + + request.set_message(msg + "1"); + EXPECT_TRUE(stream->Write(request)); + + context.TryCancel(); + + // The cancellation races with responses, so there might be zero or + // one responses pending, read till failure + + if (stream->Read(&response)) { + EXPECT_EQ(response.message(), request.message()); + // Since we have cancelled, we expect the next attempt to read to fail + EXPECT_FALSE(stream->Read(&response)); + } + + Status s = stream->Finish(); + EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code()); + if (GetParam().use_interceptors) { + EXPECT_EQ(20, DummyInterceptor::GetNumTimesCancel()); + } +} + +TEST_P(End2endTest, RpcMaxMessageSize) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message(string(kMaxMessageSize_ * 2, 'a')); + request.mutable_param()->set_server_die(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); +} + +void ReaderThreadFunc(ClientReaderWriter<EchoRequest, EchoResponse>* stream, + gpr_event* ev) { + EchoResponse resp; + gpr_event_set(ev, (void*)1); + while (stream->Read(&resp)) { + gpr_log(GPR_INFO, "Read message"); + } +} + +// Run a Read and a WritesDone simultaneously. +TEST_P(End2endTest, SimultaneousReadWritesDone) { + MAYBE_SKIP_TEST; + ResetStub(); + ClientContext context; + gpr_event ev; + gpr_event_init(&ev); + auto stream = stub_->BidiStream(&context); + std::thread reader_thread(ReaderThreadFunc, stream.get(), &ev); + gpr_event_wait(&ev, gpr_inf_future(GPR_CLOCK_REALTIME)); + stream->WritesDone(); + reader_thread.join(); + Status s = stream->Finish(); + EXPECT_TRUE(s.ok()); +} + +TEST_P(End2endTest, ChannelState) { + MAYBE_SKIP_TEST; + if (GetParam().inproc) { + return; + } + + ResetStub(); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(false)); + + // Did not ask to connect, no state change. + CompletionQueue cq; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(10); + channel_->NotifyOnStateChange(GRPC_CHANNEL_IDLE, deadline, &cq, nullptr); + void* tag; + bool ok = true; + cq.Next(&tag, &ok); + EXPECT_FALSE(ok); + + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel_->GetState(true)); + EXPECT_TRUE(channel_->WaitForStateChange(GRPC_CHANNEL_IDLE, + gpr_inf_future(GPR_CLOCK_REALTIME))); + auto state = channel_->GetState(false); + EXPECT_TRUE(state == GRPC_CHANNEL_CONNECTING || state == GRPC_CHANNEL_READY); +} + +// Takes 10s. +TEST_P(End2endTest, ChannelStateTimeout) { + if ((GetParam().credentials_type != kInsecureCredentialsType) || + GetParam().inproc) { + return; + } + int port = grpc_pick_unused_port_or_die(); + std::ostringstream server_address; + server_address << "127.0.0.1:" << port; + // Channel to non-existing server + auto channel = + grpc::CreateChannel(server_address.str(), InsecureChannelCredentials()); + // Start IDLE + EXPECT_EQ(GRPC_CHANNEL_IDLE, channel->GetState(true)); + + auto state = GRPC_CHANNEL_IDLE; + for (int i = 0; i < 10; i++) { + channel->WaitForStateChange( + state, std::chrono::system_clock::now() + std::chrono::seconds(1)); + state = channel->GetState(false); + } +} + +// Talking to a non-existing service. +TEST_P(End2endTest, NonExistingService) { + MAYBE_SKIP_TEST; + ResetChannel(); + std::unique_ptr<grpc::testing::UnimplementedEchoService::Stub> stub; + stub = grpc::testing::UnimplementedEchoService::NewStub(channel_); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub->Unimplemented(&context, request, &response); + EXPECT_EQ(StatusCode::UNIMPLEMENTED, s.error_code()); + EXPECT_EQ("", s.error_message()); +} + +// Ask the server to send back a serialized proto in trailer. +// This is an example of setting error details. +TEST_P(End2endTest, BinaryTrailerTest) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + + request.mutable_param()->set_echo_metadata(true); + DebugInfo* info = request.mutable_param()->mutable_debug_info(); + info->add_stack_entries("stack_entry_1"); + info->add_stack_entries("stack_entry_2"); + info->add_stack_entries("stack_entry_3"); + info->set_detail("detailed debug info"); TString expected_string = info->SerializeAsString(); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - auto trailers = context.GetServerTrailingMetadata(); - EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey)); - auto iter = trailers.find(kDebugInfoTrailerKey); - EXPECT_EQ(expected_string, iter->second); - // Parse the returned trailer into a DebugInfo proto. - DebugInfo returned_info; - EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); -} - -TEST_P(End2endTest, ExpectErrorTest) { - MAYBE_SKIP_TEST; - ResetStub(); - - std::vector<ErrorStatus> expected_status; - expected_status.emplace_back(); - expected_status.back().set_code(13); // INTERNAL - // No Error message or details - - expected_status.emplace_back(); - expected_status.back().set_code(13); // INTERNAL - expected_status.back().set_error_message("text error message"); - expected_status.back().set_binary_error_details("text error details"); - - expected_status.emplace_back(); - expected_status.back().set_code(13); // INTERNAL - expected_status.back().set_error_message("text error message"); - expected_status.back().set_binary_error_details( - "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB"); - - for (auto iter = expected_status.begin(); iter != expected_status.end(); - ++iter) { - EchoRequest request; - EchoResponse response; - ClientContext context; - request.set_message("Hello"); - auto* error = request.mutable_param()->mutable_expected_error(); - error->set_code(iter->code()); - error->set_error_message(iter->error_message()); - error->set_binary_error_details(iter->binary_error_details()); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(iter->code(), s.error_code()); - EXPECT_EQ(iter->error_message(), s.error_message()); - EXPECT_EQ(iter->binary_error_details(), s.error_details()); - EXPECT_TRUE(context.debug_error_string().find("created") != + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + auto trailers = context.GetServerTrailingMetadata(); + EXPECT_EQ(1u, trailers.count(kDebugInfoTrailerKey)); + auto iter = trailers.find(kDebugInfoTrailerKey); + EXPECT_EQ(expected_string, iter->second); + // Parse the returned trailer into a DebugInfo proto. + DebugInfo returned_info; + EXPECT_TRUE(returned_info.ParseFromString(ToString(iter->second))); +} + +TEST_P(End2endTest, ExpectErrorTest) { + MAYBE_SKIP_TEST; + ResetStub(); + + std::vector<ErrorStatus> expected_status; + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + // No Error message or details + + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details("text error details"); + + expected_status.emplace_back(); + expected_status.back().set_code(13); // INTERNAL + expected_status.back().set_error_message("text error message"); + expected_status.back().set_binary_error_details( + "\x0\x1\x2\x3\x4\x5\x6\x8\x9\xA\xB"); + + for (auto iter = expected_status.begin(); iter != expected_status.end(); + ++iter) { + EchoRequest request; + EchoResponse response; + ClientContext context; + request.set_message("Hello"); + auto* error = request.mutable_param()->mutable_expected_error(); + error->set_code(iter->code()); + error->set_error_message(iter->error_message()); + error->set_binary_error_details(iter->binary_error_details()); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(iter->code(), s.error_code()); + EXPECT_EQ(iter->error_message(), s.error_message()); + EXPECT_EQ(iter->binary_error_details(), s.error_details()); + EXPECT_TRUE(context.debug_error_string().find("created") != TString::npos); EXPECT_TRUE(context.debug_error_string().find("file") != TString::npos); EXPECT_TRUE(context.debug_error_string().find("line") != TString::npos); - EXPECT_TRUE(context.debug_error_string().find("status") != + EXPECT_TRUE(context.debug_error_string().find("status") != TString::npos); EXPECT_TRUE(context.debug_error_string().find("13") != TString::npos); - } -} - -////////////////////////////////////////////////////////////////////////// -// Test with and without a proxy. -class ProxyEnd2endTest : public End2endTest { - protected: -}; - -TEST_P(ProxyEnd2endTest, SimpleRpc) { - MAYBE_SKIP_TEST; - ResetStub(); - SendRpc(stub_.get(), 1, false); -} - -TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_TRUE(s.ok()); -} - -TEST_P(ProxyEnd2endTest, MultipleRpcs) { - MAYBE_SKIP_TEST; - ResetStub(); - std::vector<std::thread> threads; - threads.reserve(10); - for (int i = 0; i < 10; ++i) { - threads.emplace_back(SendRpc, stub_.get(), 10, false); - } - for (int i = 0; i < 10; ++i) { - threads[i].join(); - } -} - -// Set a 10us deadline and make sure proper error is returned. -TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_skip_cancelled_check(true); - // Let server sleep for 40 ms first to guarantee expiry. - // 40 ms might seem a bit extreme but the timer manager would have been just - // initialized (when ResetStub() was called) and there are some warmup costs - // i.e the timer thread many not have even started. There might also be other - // delays in the timer manager thread (in acquiring locks, timer data - // structure manipulations, starting backup timer threads) that add to the - // delays. 40ms is still not enough in some cases but this significantly - // reduces the test flakes - request.mutable_param()->set_server_sleep_us(40 * 1000); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(1); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code()); -} - -// Set a long but finite deadline. -TEST_P(ProxyEnd2endTest, RpcLongDeadline) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::hours(1); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); -} - -// Ask server to echo back the deadline it sees. -TEST_P(ProxyEnd2endTest, EchoDeadline) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_echo_deadline(true); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(100); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - gpr_timespec sent_deadline; - Timepoint2Timespec(deadline, &sent_deadline); - // We want to allow some reasonable error given: - // - request_deadline() only has 1sec resolution so the best we can do is +-1 - // - if sent_deadline.tv_nsec is very close to the next second's boundary we - // can end up being off by 2 in one direction. - EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2); - EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1); -} - -// Ask server to echo back the deadline it sees. The rpc has no deadline. -TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_echo_deadline(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - EXPECT_EQ(response.param().request_deadline(), - gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec); -} - -TEST_P(ProxyEnd2endTest, UnimplementedRpc) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - Status s = stub_->Unimplemented(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED); - EXPECT_EQ(s.error_message(), ""); - EXPECT_EQ(response.message(), ""); -} - -// Client cancels rpc after 10ms -TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - const int kCancelDelayUs = 10 * 1000; - request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); - - ClientContext context; - std::thread cancel_thread; - if (!GetParam().callback_server) { - cancel_thread = std::thread( - [&context, this](int delay) { CancelRpc(&context, delay, &service_); }, - kCancelDelayUs); - // Note: the unusual pattern above (and below) is caused by a conflict - // between two sets of compiler expectations. clang allows const to be - // captured without mention, so there is no need to capture kCancelDelayUs - // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler - // in our tests requires an explicit capture even for const. We square this - // circle by passing the const value in as an argument to the lambda. - } else { - cancel_thread = std::thread( - [&context, this](int delay) { - CancelRpc(&context, delay, &callback_service_); - }, - kCancelDelayUs); - } - Status s = stub_->Echo(&context, request, &response); - cancel_thread.join(); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - EXPECT_EQ(s.error_message(), "Cancelled"); -} - -// Server cancels rpc after 1ms -TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_server_cancel_after_us(1000); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); - EXPECT_TRUE(s.error_message().empty()); -} - -// Make the response larger than the flow control window. -TEST_P(ProxyEnd2endTest, HugeResponse) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("huge response"); - const size_t kResponseSize = 1024 * (1024 + 10); - request.mutable_param()->set_response_message_length(kResponseSize); - - ClientContext context; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::seconds(20); - context.set_deadline(deadline); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(kResponseSize, response.message().size()); - EXPECT_TRUE(s.ok()); -} - -TEST_P(ProxyEnd2endTest, Peer) { - MAYBE_SKIP_TEST; - // Peer is not meaningful for inproc - if (GetParam().inproc) { - return; - } - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("hello"); - request.mutable_param()->set_echo_peer(true); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(CheckIsLocalhost(response.param().peer())); - EXPECT_TRUE(CheckIsLocalhost(context.peer())); -} - -////////////////////////////////////////////////////////////////////////// -class SecureEnd2endTest : public End2endTest { - protected: - SecureEnd2endTest() { - GPR_ASSERT(!GetParam().use_proxy); - GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType); - } -}; - -TEST_P(SecureEnd2endTest, SimpleRpcWithHost) { - MAYBE_SKIP_TEST; - ResetStub(); - - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - context.set_authority("foo.test.youtube.com"); - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(response.has_param()); - EXPECT_EQ("special", response.param().host()); - EXPECT_TRUE(s.ok()); -} - -bool MetadataContains( - const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, + } +} + +////////////////////////////////////////////////////////////////////////// +// Test with and without a proxy. +class ProxyEnd2endTest : public End2endTest { + protected: +}; + +TEST_P(ProxyEnd2endTest, SimpleRpc) { + MAYBE_SKIP_TEST; + ResetStub(); + SendRpc(stub_.get(), 1, false); +} + +TEST_P(ProxyEnd2endTest, SimpleRpcWithEmptyMessages) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_TRUE(s.ok()); +} + +TEST_P(ProxyEnd2endTest, MultipleRpcs) { + MAYBE_SKIP_TEST; + ResetStub(); + std::vector<std::thread> threads; + threads.reserve(10); + for (int i = 0; i < 10; ++i) { + threads.emplace_back(SendRpc, stub_.get(), 10, false); + } + for (int i = 0; i < 10; ++i) { + threads[i].join(); + } +} + +// Set a 10us deadline and make sure proper error is returned. +TEST_P(ProxyEnd2endTest, RpcDeadlineExpires) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_skip_cancelled_check(true); + // Let server sleep for 40 ms first to guarantee expiry. + // 40 ms might seem a bit extreme but the timer manager would have been just + // initialized (when ResetStub() was called) and there are some warmup costs + // i.e the timer thread many not have even started. There might also be other + // delays in the timer manager thread (in acquiring locks, timer data + // structure manipulations, starting backup timer threads) that add to the + // delays. 40ms is still not enough in some cases but this significantly + // reduces the test flakes + request.mutable_param()->set_server_sleep_us(40 * 1000); + + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(1); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::DEADLINE_EXCEEDED, s.error_code()); +} + +// Set a long but finite deadline. +TEST_P(ProxyEnd2endTest, RpcLongDeadline) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::hours(1); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +// Ask server to echo back the deadline it sees. +TEST_P(ProxyEnd2endTest, EchoDeadline) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_echo_deadline(true); + + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(100); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + gpr_timespec sent_deadline; + Timepoint2Timespec(deadline, &sent_deadline); + // We want to allow some reasonable error given: + // - request_deadline() only has 1sec resolution so the best we can do is +-1 + // - if sent_deadline.tv_nsec is very close to the next second's boundary we + // can end up being off by 2 in one direction. + EXPECT_LE(response.param().request_deadline() - sent_deadline.tv_sec, 2); + EXPECT_GE(response.param().request_deadline() - sent_deadline.tv_sec, -1); +} + +// Ask server to echo back the deadline it sees. The rpc has no deadline. +TEST_P(ProxyEnd2endTest, EchoDeadlineForNoDeadlineRpc) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_echo_deadline(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(response.param().request_deadline(), + gpr_inf_future(GPR_CLOCK_REALTIME).tv_sec); +} + +TEST_P(ProxyEnd2endTest, UnimplementedRpc) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub_->Unimplemented(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), grpc::StatusCode::UNIMPLEMENTED); + EXPECT_EQ(s.error_message(), ""); + EXPECT_EQ(response.message(), ""); +} + +// Client cancels rpc after 10ms +TEST_P(ProxyEnd2endTest, ClientCancelsRpc) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + const int kCancelDelayUs = 10 * 1000; + request.mutable_param()->set_client_cancel_after_us(kCancelDelayUs); + + ClientContext context; + std::thread cancel_thread; + if (!GetParam().callback_server) { + cancel_thread = std::thread( + [&context, this](int delay) { CancelRpc(&context, delay, &service_); }, + kCancelDelayUs); + // Note: the unusual pattern above (and below) is caused by a conflict + // between two sets of compiler expectations. clang allows const to be + // captured without mention, so there is no need to capture kCancelDelayUs + // (and indeed clang-tidy complains if you do so). OTOH, a Windows compiler + // in our tests requires an explicit capture even for const. We square this + // circle by passing the const value in as an argument to the lambda. + } else { + cancel_thread = std::thread( + [&context, this](int delay) { + CancelRpc(&context, delay, &callback_service_); + }, + kCancelDelayUs); + } + Status s = stub_->Echo(&context, request, &response); + cancel_thread.join(); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_EQ(s.error_message(), "Cancelled"); +} + +// Server cancels rpc after 1ms +TEST_P(ProxyEnd2endTest, ServerCancelsRpc) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_server_cancel_after_us(1000); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(StatusCode::CANCELLED, s.error_code()); + EXPECT_TRUE(s.error_message().empty()); +} + +// Make the response larger than the flow control window. +TEST_P(ProxyEnd2endTest, HugeResponse) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("huge response"); + const size_t kResponseSize = 1024 * (1024 + 10); + request.mutable_param()->set_response_message_length(kResponseSize); + + ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds(20); + context.set_deadline(deadline); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(kResponseSize, response.message().size()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(ProxyEnd2endTest, Peer) { + MAYBE_SKIP_TEST; + // Peer is not meaningful for inproc + if (GetParam().inproc) { + return; + } + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("hello"); + request.mutable_param()->set_echo_peer(true); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(CheckIsLocalhost(response.param().peer())); + EXPECT_TRUE(CheckIsLocalhost(context.peer())); +} + +////////////////////////////////////////////////////////////////////////// +class SecureEnd2endTest : public End2endTest { + protected: + SecureEnd2endTest() { + GPR_ASSERT(!GetParam().use_proxy); + GPR_ASSERT(GetParam().credentials_type != kInsecureCredentialsType); + } +}; + +TEST_P(SecureEnd2endTest, SimpleRpcWithHost) { + MAYBE_SKIP_TEST; + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + context.set_authority("foo.test.youtube.com"); + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(response.has_param()); + EXPECT_EQ("special", response.param().host()); + EXPECT_TRUE(s.ok()); +} + +bool MetadataContains( + const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, const TString& key, const TString& value) { - int count = 0; - - for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter = - metadata.begin(); - iter != metadata.end(); ++iter) { - if (ToString(iter->first) == key && ToString(iter->second) == value) { - count++; - } - } - return count == 1; -} - -TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { - MAYBE_SKIP_TEST; - auto* processor = new TestAuthMetadataProcessor(true); - StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(processor->GetCompatibleClientCreds()); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - request.mutable_param()->set_expected_client_identity( - TestAuthMetadataProcessor::kGoodGuy); - request.mutable_param()->set_expected_transport_security_type( - GetParam().credentials_type); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); - - // Metadata should have been consumed by the processor. - EXPECT_FALSE(MetadataContains( - context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, + int count = 0; + + for (std::multimap<grpc::string_ref, grpc::string_ref>::const_iterator iter = + metadata.begin(); + iter != metadata.end(); ++iter) { + if (ToString(iter->first) == key && ToString(iter->second) == value) { + count++; + } + } + return count == 1; +} + +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; + auto* processor = new TestAuthMetadataProcessor(true); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetCompatibleClientCreds()); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + request.mutable_param()->set_expected_client_identity( + TestAuthMetadataProcessor::kGoodGuy); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + + // Metadata should have been consumed by the processor. + EXPECT_FALSE(MetadataContains( + context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, TString("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); -} - -TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { - MAYBE_SKIP_TEST; - auto* processor = new TestAuthMetadataProcessor(true); - StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(processor->GetIncompatibleClientCreds()); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); -} - -TEST_P(SecureEnd2endTest, SetPerCallCredentials) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr<CallCredentials> creds = +} + +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; + auto* processor = new TestAuthMetadataProcessor(true); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetIncompatibleClientCreds()); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); +} + +TEST_P(SecureEnd2endTest, SetPerCallCredentials) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr<CallCredentials> creds = GoogleIAMCredentials(kFakeToken, kFakeSelector); - context.set_credentials(creds); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + context.set_credentials(creds); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, kFakeToken)); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, kFakeSelector)); EXPECT_EQ(context.credentials()->DebugString(), kExpectedFakeCredsDebugString); -} - -class CredentialsInterceptor : public experimental::Interceptor { - public: - CredentialsInterceptor(experimental::ClientRpcInfo* info) : info_(info) {} - - void Intercept(experimental::InterceptorBatchMethods* methods) { - if (methods->QueryInterceptionHookPoint( - experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { - std::shared_ptr<CallCredentials> creds = +} + +class CredentialsInterceptor : public experimental::Interceptor { + public: + CredentialsInterceptor(experimental::ClientRpcInfo* info) : info_(info) {} + + void Intercept(experimental::InterceptorBatchMethods* methods) { + if (methods->QueryInterceptionHookPoint( + experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { + std::shared_ptr<CallCredentials> creds = GoogleIAMCredentials(kFakeToken, kFakeSelector); - info_->client_context()->set_credentials(creds); - } - methods->Proceed(); - } - - private: - experimental::ClientRpcInfo* info_ = nullptr; -}; - -class CredentialsInterceptorFactory - : public experimental::ClientInterceptorFactoryInterface { - CredentialsInterceptor* CreateClientInterceptor( - experimental::ClientRpcInfo* info) { - return new CredentialsInterceptor(info); - } -}; - -TEST_P(SecureEnd2endTest, CallCredentialsInterception) { - MAYBE_SKIP_TEST; - if (!GetParam().use_interceptors) { - return; - } - std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> - interceptor_creators; - interceptor_creators.push_back(std::unique_ptr<CredentialsInterceptorFactory>( - new CredentialsInterceptorFactory())); - ResetStub(std::move(interceptor_creators)); - EchoRequest request; - EchoResponse response; - ClientContext context; - - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + info_->client_context()->set_credentials(creds); + } + methods->Proceed(); + } + + private: + experimental::ClientRpcInfo* info_ = nullptr; +}; + +class CredentialsInterceptorFactory + : public experimental::ClientInterceptorFactoryInterface { + CredentialsInterceptor* CreateClientInterceptor( + experimental::ClientRpcInfo* info) { + return new CredentialsInterceptor(info); + } +}; + +TEST_P(SecureEnd2endTest, CallCredentialsInterception) { + MAYBE_SKIP_TEST; + if (!GetParam().use_interceptors) { + return; + } + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + interceptor_creators; + interceptor_creators.push_back(std::unique_ptr<CredentialsInterceptorFactory>( + new CredentialsInterceptorFactory())); + ResetStub(std::move(interceptor_creators)); + EchoRequest request; + EchoResponse response; + ClientContext context; + + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, kFakeToken)); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, kFakeSelector)); EXPECT_EQ(context.credentials()->DebugString(), kExpectedFakeCredsDebugString); -} - -TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) { - MAYBE_SKIP_TEST; - if (!GetParam().use_interceptors) { - return; - } - std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> - interceptor_creators; - interceptor_creators.push_back(std::unique_ptr<CredentialsInterceptorFactory>( - new CredentialsInterceptorFactory())); - ResetStub(std::move(interceptor_creators)); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr<CallCredentials> creds1 = +} + +TEST_P(SecureEnd2endTest, CallCredentialsInterceptionWithSetCredentials) { + MAYBE_SKIP_TEST; + if (!GetParam().use_interceptors) { + return; + } + std::vector<std::unique_ptr<experimental::ClientInterceptorFactoryInterface>> + interceptor_creators; + interceptor_creators.push_back(std::unique_ptr<CredentialsInterceptorFactory>( + new CredentialsInterceptorFactory())); + ResetStub(std::move(interceptor_creators)); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr<CallCredentials> creds1 = GoogleIAMCredentials(kWrongToken, kWrongSelector); - context.set_credentials(creds1); - EXPECT_EQ(context.credentials(), creds1); + context.set_credentials(creds1); + EXPECT_EQ(context.credentials(), creds1); EXPECT_EQ(context.credentials()->DebugString(), kExpectedWrongCredsDebugString); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, kFakeToken)); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, kFakeSelector)); EXPECT_EQ(context.credentials()->DebugString(), kExpectedFakeCredsDebugString); -} - -TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - std::shared_ptr<CallCredentials> creds1 = +} + +TEST_P(SecureEnd2endTest, OverridePerCallCredentials) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + std::shared_ptr<CallCredentials> creds1 = GoogleIAMCredentials(kFakeToken1, kFakeSelector1); - context.set_credentials(creds1); - EXPECT_EQ(context.credentials(), creds1); + context.set_credentials(creds1); + EXPECT_EQ(context.credentials(), creds1); EXPECT_EQ(context.credentials()->DebugString(), kExpectedFakeCreds1DebugString); - std::shared_ptr<CallCredentials> creds2 = + std::shared_ptr<CallCredentials> creds2 = GoogleIAMCredentials(kFakeToken2, kFakeSelector2); - context.set_credentials(creds2); - EXPECT_EQ(context.credentials(), creds2); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + context.set_credentials(creds2); + EXPECT_EQ(context.credentials(), creds2); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, kFakeToken2)); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, kFakeSelector2)); - EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, + EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORIZATION_TOKEN_METADATA_KEY, kFakeToken1)); - EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), - GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, + EXPECT_FALSE(MetadataContains(context.GetServerTrailingMetadata(), + GRPC_IAM_AUTHORITY_SELECTOR_METADATA_KEY, kFakeSelector1)); EXPECT_EQ(context.credentials()->DebugString(), kExpectedFakeCreds2DebugString); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); -} - -TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kBadMetadataKey, - "Does not matter, will fail the key is invalid.", false, true, - 0)))); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); +} + +TEST_P(SecureEnd2endTest, AuthMetadataPluginKeyFailure) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kBadMetadataKey, + "Does not matter, will fail the key is invalid.", false, true, + 0)))); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); EXPECT_EQ(context.credentials()->DebugString(), kExpectedAuthMetadataPluginKeyFailureCredsDebugString); -} - -TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kGoodMetadataKey, - "With illegal \n value.", false, true, 0)))); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); +} + +TEST_P(SecureEnd2endTest, AuthMetadataPluginValueFailure) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kGoodMetadataKey, + "With illegal \n value.", false, true, 0)))); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); EXPECT_EQ(context.credentials()->DebugString(), kExpectedAuthMetadataPluginValueFailureCredsDebugString); -} - -TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - request.mutable_param()->set_skip_cancelled_check(true); - EchoResponse response; - ClientContext context; - const int delay = 100; - std::chrono::system_clock::time_point deadline = - std::chrono::system_clock::now() + std::chrono::milliseconds(delay); - context.set_deadline(deadline); - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true, - true, delay)))); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - if (!s.ok()) { - EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED || - s.error_code() == StatusCode::UNAVAILABLE); - } +} + +TEST_P(SecureEnd2endTest, AuthMetadataPluginWithDeadline) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + request.mutable_param()->set_skip_cancelled_check(true); + EchoResponse response; + ClientContext context; + const int delay = 100; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::milliseconds(delay); + context.set_deadline(deadline); + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true, + true, delay)))); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + if (!s.ok()) { + EXPECT_TRUE(s.error_code() == StatusCode::DEADLINE_EXCEEDED || + s.error_code() == StatusCode::UNAVAILABLE); + } EXPECT_EQ(context.credentials()->DebugString(), kExpectedAuthMetadataPluginWithDeadlineCredsDebugString); -} - -TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - request.mutable_param()->set_skip_cancelled_check(true); - EchoResponse response; - ClientContext context; - const int delay = 100; - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true, - true, delay)))); - request.set_message("Hello"); - - std::thread cancel_thread([&] { - gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), - gpr_time_from_millis(delay, GPR_TIMESPAN))); - context.TryCancel(); - }); - Status s = stub_->Echo(&context, request, &response); - if (!s.ok()) { - EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED || - s.error_code() == StatusCode::UNAVAILABLE); - } - cancel_thread.join(); +} + +TEST_P(SecureEnd2endTest, AuthMetadataPluginWithCancel) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + request.mutable_param()->set_skip_cancelled_check(true); + EchoResponse response; + ClientContext context; + const int delay = 100; + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin("meta_key", "Does not matter", true, + true, delay)))); + request.set_message("Hello"); + + std::thread cancel_thread([&] { + gpr_sleep_until(gpr_time_add(gpr_now(GPR_CLOCK_REALTIME), + gpr_time_from_millis(delay, GPR_TIMESPAN))); + context.TryCancel(); + }); + Status s = stub_->Echo(&context, request, &response); + if (!s.ok()) { + EXPECT_TRUE(s.error_code() == StatusCode::CANCELLED || + s.error_code() == StatusCode::UNAVAILABLE); + } + cancel_thread.join(); EXPECT_EQ(context.credentials()->DebugString(), kExpectedAuthMetadataPluginWithDeadlineCredsDebugString); -} - -TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kGoodMetadataKey, - "Does not matter, will fail anyway (see 3rd param)", false, false, - 0)))); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); - EXPECT_EQ(s.error_message(), +} + +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kGoodMetadataKey, + "Does not matter, will fail anyway (see 3rd param)", false, false, + 0)))); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); + EXPECT_EQ(s.error_message(), TString("Getting metadata from plugin failed with error: ") + - kTestCredsPluginErrorMsg); + kTestCredsPluginErrorMsg); EXPECT_EQ(context.credentials()->DebugString(), kExpectedNonBlockingAuthMetadataPluginFailureCredsDebugString); -} - -TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { - MAYBE_SKIP_TEST; - auto* processor = new TestAuthMetadataProcessor(false); - StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(processor->GetCompatibleClientCreds()); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - request.mutable_param()->set_expected_client_identity( - TestAuthMetadataProcessor::kGoodGuy); - request.mutable_param()->set_expected_transport_security_type( - GetParam().credentials_type); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(request.message(), response.message()); - EXPECT_TRUE(s.ok()); - - // Metadata should have been consumed by the processor. - EXPECT_FALSE(MetadataContains( - context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, +} + +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorSuccess) { + MAYBE_SKIP_TEST; + auto* processor = new TestAuthMetadataProcessor(false); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetCompatibleClientCreds()); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + request.mutable_param()->set_expected_client_identity( + TestAuthMetadataProcessor::kGoodGuy); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(request.message(), response.message()); + EXPECT_TRUE(s.ok()); + + // Metadata should have been consumed by the processor. + EXPECT_FALSE(MetadataContains( + context.GetServerTrailingMetadata(), GRPC_AUTHORIZATION_METADATA_KEY, TString("Bearer ") + TestAuthMetadataProcessor::kGoodGuy)); EXPECT_EQ( context.credentials()->DebugString(), kExpectedNonBlockingAuthMetadataPluginAndProcessorSuccessCredsDebugString); -} - -TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { - MAYBE_SKIP_TEST; - auto* processor = new TestAuthMetadataProcessor(false); - StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(processor->GetIncompatibleClientCreds()); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); +} + +TEST_P(SecureEnd2endTest, NonBlockingAuthMetadataPluginAndProcessorFailure) { + MAYBE_SKIP_TEST; + auto* processor = new TestAuthMetadataProcessor(false); + StartServer(std::shared_ptr<AuthMetadataProcessor>(processor)); + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(processor->GetIncompatibleClientCreds()); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAUTHENTICATED); EXPECT_EQ( context.credentials()->DebugString(), kExpectedNonBlockingAuthMetadataPluginAndProcessorFailureCredsDebugString); -} - -TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - context.set_credentials(grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin( - TestMetadataCredentialsPlugin::kGoodMetadataKey, - "Does not matter, will fail anyway (see 3rd param)", true, false, - 0)))); - request.set_message("Hello"); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_FALSE(s.ok()); - EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); - EXPECT_EQ(s.error_message(), +} + +TEST_P(SecureEnd2endTest, BlockingAuthMetadataPluginFailure) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + context.set_credentials(grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin( + TestMetadataCredentialsPlugin::kGoodMetadataKey, + "Does not matter, will fail anyway (see 3rd param)", true, false, + 0)))); + request.set_message("Hello"); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_FALSE(s.ok()); + EXPECT_EQ(s.error_code(), StatusCode::UNAVAILABLE); + EXPECT_EQ(s.error_message(), TString("Getting metadata from plugin failed with error: ") + - kTestCredsPluginErrorMsg); + kTestCredsPluginErrorMsg); EXPECT_EQ(context.credentials()->DebugString(), kExpectedBlockingAuthMetadataPluginFailureCredsDebugString); -} - -TEST_P(SecureEnd2endTest, CompositeCallCreds) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - ClientContext context; - const char kMetadataKey1[] = "call-creds-key1"; - const char kMetadataKey2[] = "call-creds-key2"; - const char kMetadataVal1[] = "call-creds-val1"; - const char kMetadataVal2[] = "call-creds-val2"; - - context.set_credentials(grpc::CompositeCallCredentials( - grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1, - true, true, 0))), - grpc::MetadataCredentialsFromPlugin( - std::unique_ptr<MetadataCredentialsPlugin>( - new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2, - true, true, 0))))); - request.set_message("Hello"); - request.mutable_param()->set_echo_metadata(true); - - Status s = stub_->Echo(&context, request, &response); - EXPECT_TRUE(s.ok()); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - kMetadataKey1, kMetadataVal1)); - EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), - kMetadataKey2, kMetadataVal2)); +} + +TEST_P(SecureEnd2endTest, CompositeCallCreds) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + ClientContext context; + const char kMetadataKey1[] = "call-creds-key1"; + const char kMetadataKey2[] = "call-creds-key2"; + const char kMetadataVal1[] = "call-creds-val1"; + const char kMetadataVal2[] = "call-creds-val2"; + + context.set_credentials(grpc::CompositeCallCredentials( + grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin(kMetadataKey1, kMetadataVal1, + true, true, 0))), + grpc::MetadataCredentialsFromPlugin( + std::unique_ptr<MetadataCredentialsPlugin>( + new TestMetadataCredentialsPlugin(kMetadataKey2, kMetadataVal2, + true, true, 0))))); + request.set_message("Hello"); + request.mutable_param()->set_echo_metadata(true); + + Status s = stub_->Echo(&context, request, &response); + EXPECT_TRUE(s.ok()); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + kMetadataKey1, kMetadataVal1)); + EXPECT_TRUE(MetadataContains(context.GetServerTrailingMetadata(), + kMetadataKey2, kMetadataVal2)); EXPECT_EQ(context.credentials()->DebugString(), kExpectedCompositeCallCredsDebugString); -} - -TEST_P(SecureEnd2endTest, ClientAuthContext) { - MAYBE_SKIP_TEST; - ResetStub(); - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - request.mutable_param()->set_check_auth_context(GetParam().credentials_type == - kTlsCredentialsType); - request.mutable_param()->set_expected_transport_security_type( - GetParam().credentials_type); - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); - - std::shared_ptr<const AuthContext> auth_ctx = context.auth_context(); - std::vector<grpc::string_ref> tst = - auth_ctx->FindPropertyValues("transport_security_type"); - ASSERT_EQ(1u, tst.size()); - EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); - if (GetParam().credentials_type == kTlsCredentialsType) { - EXPECT_EQ("x509_subject_alternative_name", - auth_ctx->GetPeerIdentityPropertyName()); - EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size()); - EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0])); - EXPECT_EQ("waterzooi.test.google.be", - ToString(auth_ctx->GetPeerIdentity()[1])); - EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2])); - EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3])); - } -} - -class ResourceQuotaEnd2endTest : public End2endTest { - public: - ResourceQuotaEnd2endTest() - : server_resource_quota_("server_resource_quota") {} - - virtual void ConfigureServerBuilder(ServerBuilder* builder) override { - builder->SetResourceQuota(server_resource_quota_); - } - - private: - ResourceQuota server_resource_quota_; -}; - -TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { - MAYBE_SKIP_TEST; - ResetStub(); - - EchoRequest request; - EchoResponse response; - request.set_message("Hello"); - - ClientContext context; - Status s = stub_->Echo(&context, request, &response); - EXPECT_EQ(response.message(), request.message()); - EXPECT_TRUE(s.ok()); -} - -// TODO(vjpai): refactor arguments into a struct if it makes sense -std::vector<TestScenario> CreateTestScenarios(bool use_proxy, - bool test_insecure, - bool test_secure, - bool test_inproc, - bool test_callback_server) { - std::vector<TestScenario> scenarios; +} + +TEST_P(SecureEnd2endTest, ClientAuthContext) { + MAYBE_SKIP_TEST; + ResetStub(); + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + request.mutable_param()->set_check_auth_context(GetParam().credentials_type == + kTlsCredentialsType); + request.mutable_param()->set_expected_transport_security_type( + GetParam().credentials_type); + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); + + std::shared_ptr<const AuthContext> auth_ctx = context.auth_context(); + std::vector<grpc::string_ref> tst = + auth_ctx->FindPropertyValues("transport_security_type"); + ASSERT_EQ(1u, tst.size()); + EXPECT_EQ(GetParam().credentials_type, ToString(tst[0])); + if (GetParam().credentials_type == kTlsCredentialsType) { + EXPECT_EQ("x509_subject_alternative_name", + auth_ctx->GetPeerIdentityPropertyName()); + EXPECT_EQ(4u, auth_ctx->GetPeerIdentity().size()); + EXPECT_EQ("*.test.google.fr", ToString(auth_ctx->GetPeerIdentity()[0])); + EXPECT_EQ("waterzooi.test.google.be", + ToString(auth_ctx->GetPeerIdentity()[1])); + EXPECT_EQ("*.test.youtube.com", ToString(auth_ctx->GetPeerIdentity()[2])); + EXPECT_EQ("192.168.1.3", ToString(auth_ctx->GetPeerIdentity()[3])); + } +} + +class ResourceQuotaEnd2endTest : public End2endTest { + public: + ResourceQuotaEnd2endTest() + : server_resource_quota_("server_resource_quota") {} + + virtual void ConfigureServerBuilder(ServerBuilder* builder) override { + builder->SetResourceQuota(server_resource_quota_); + } + + private: + ResourceQuota server_resource_quota_; +}; + +TEST_P(ResourceQuotaEnd2endTest, SimpleRequest) { + MAYBE_SKIP_TEST; + ResetStub(); + + EchoRequest request; + EchoResponse response; + request.set_message("Hello"); + + ClientContext context; + Status s = stub_->Echo(&context, request, &response); + EXPECT_EQ(response.message(), request.message()); + EXPECT_TRUE(s.ok()); +} + +// TODO(vjpai): refactor arguments into a struct if it makes sense +std::vector<TestScenario> CreateTestScenarios(bool use_proxy, + bool test_insecure, + bool test_secure, + bool test_inproc, + bool test_callback_server) { + std::vector<TestScenario> scenarios; std::vector<TString> credentials_types; - + GPR_GLOBAL_CONFIG_SET(grpc_client_channel_backup_poll_interval_ms, kClientChannelBackupPollIntervalMs); -#if TARGET_OS_IPHONE - // Workaround Apple CFStream bug - gpr_setenv("grpc_cfstream", "0"); -#endif - - if (test_secure) { - credentials_types = - GetCredentialsProvider()->GetSecureCredentialsTypeList(); - } - auto insec_ok = [] { - // Only allow insecure credentials type when it is registered with the - // provider. User may create providers that do not have insecure. - return GetCredentialsProvider()->GetChannelCredentials( - kInsecureCredentialsType, nullptr) != nullptr; - }; - if (test_insecure && insec_ok()) { - credentials_types.push_back(kInsecureCredentialsType); - } - - // Test callback with inproc or if the event-engine allows it - GPR_ASSERT(!credentials_types.empty()); - for (const auto& cred : credentials_types) { - scenarios.emplace_back(false, false, false, cred, false); - scenarios.emplace_back(true, false, false, cred, false); - if (test_callback_server) { - // Note that these scenarios will be dynamically disabled if the event - // engine doesn't run in the background - scenarios.emplace_back(false, false, false, cred, true); - scenarios.emplace_back(true, false, false, cred, true); - } - if (use_proxy) { - scenarios.emplace_back(false, true, false, cred, false); - scenarios.emplace_back(true, true, false, cred, false); - } - } - if (test_inproc && insec_ok()) { - scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false); - scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false); - if (test_callback_server) { - scenarios.emplace_back(false, false, true, kInsecureCredentialsType, - true); - scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true); - } - } - return scenarios; -} - -INSTANTIATE_TEST_SUITE_P( - End2end, End2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); - -INSTANTIATE_TEST_SUITE_P( - End2endServerTryCancel, End2endServerTryCancelTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); - -INSTANTIATE_TEST_SUITE_P( - ProxyEnd2end, ProxyEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true))); - -INSTANTIATE_TEST_SUITE_P( - SecureEnd2end, SecureEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true))); - -INSTANTIATE_TEST_SUITE_P( - ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, - ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); - -} // namespace -} // namespace testing -} // namespace grpc - -int main(int argc, char** argv) { - grpc::testing::TestEnvironment env(argc, argv); - ::testing::InitGoogleTest(&argc, argv); - int ret = RUN_ALL_TESTS(); - return ret; -} +#if TARGET_OS_IPHONE + // Workaround Apple CFStream bug + gpr_setenv("grpc_cfstream", "0"); +#endif + + if (test_secure) { + credentials_types = + GetCredentialsProvider()->GetSecureCredentialsTypeList(); + } + auto insec_ok = [] { + // Only allow insecure credentials type when it is registered with the + // provider. User may create providers that do not have insecure. + return GetCredentialsProvider()->GetChannelCredentials( + kInsecureCredentialsType, nullptr) != nullptr; + }; + if (test_insecure && insec_ok()) { + credentials_types.push_back(kInsecureCredentialsType); + } + + // Test callback with inproc or if the event-engine allows it + GPR_ASSERT(!credentials_types.empty()); + for (const auto& cred : credentials_types) { + scenarios.emplace_back(false, false, false, cred, false); + scenarios.emplace_back(true, false, false, cred, false); + if (test_callback_server) { + // Note that these scenarios will be dynamically disabled if the event + // engine doesn't run in the background + scenarios.emplace_back(false, false, false, cred, true); + scenarios.emplace_back(true, false, false, cred, true); + } + if (use_proxy) { + scenarios.emplace_back(false, true, false, cred, false); + scenarios.emplace_back(true, true, false, cred, false); + } + } + if (test_inproc && insec_ok()) { + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, false); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, false); + if (test_callback_server) { + scenarios.emplace_back(false, false, true, kInsecureCredentialsType, + true); + scenarios.emplace_back(true, false, true, kInsecureCredentialsType, true); + } + } + return scenarios; +} + +INSTANTIATE_TEST_SUITE_P( + End2end, End2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); + +INSTANTIATE_TEST_SUITE_P( + End2endServerTryCancel, End2endServerTryCancelTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); + +INSTANTIATE_TEST_SUITE_P( + ProxyEnd2end, ProxyEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(true, true, true, true, true))); + +INSTANTIATE_TEST_SUITE_P( + SecureEnd2end, SecureEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, false, true, false, true))); + +INSTANTIATE_TEST_SUITE_P( + ResourceQuotaEnd2end, ResourceQuotaEnd2endTest, + ::testing::ValuesIn(CreateTestScenarios(false, true, true, true, true))); + +} // namespace +} // namespace testing +} // namespace grpc + +int main(int argc, char** argv) { + grpc::testing::TestEnvironment env(argc, argv); + ::testing::InitGoogleTest(&argc, argv); + int ret = RUN_ALL_TESTS(); + return ret; +} |