diff options
author | Devtools Arcadia <arcadia-devtools@yandex-team.ru> | 2022-02-07 18:08:42 +0300 |
---|---|---|
committer | Devtools Arcadia <arcadia-devtools@mous.vla.yp-c.yandex.net> | 2022-02-07 18:08:42 +0300 |
commit | 1110808a9d39d4b808aef724c861a2e1a38d2a69 (patch) | |
tree | e26c9fed0de5d9873cce7e00bc214573dc2195b7 /contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc | |
download | ydb-1110808a9d39d4b808aef724c861a2e1a38d2a69.tar.gz |
intermediate changes
ref:cde9a383711a11544ce7e107a78147fb96cc4029
Diffstat (limited to 'contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc')
-rw-r--r-- | contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc | 496 |
1 files changed, 496 insertions, 0 deletions
diff --git a/contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc b/contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc new file mode 100644 index 0000000000..e6695982bd --- /dev/null +++ b/contrib/libs/grpc/test/cpp/end2end/cfstream_test.cc @@ -0,0 +1,496 @@ +/* + * + * Copyright 2019 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "src/core/lib/iomgr/port.h" + +#include <algorithm> +#include <memory> +#include <mutex> +#include <random> +#include <thread> + +#include <grpc/grpc.h> +#include <grpc/support/alloc.h> +#include <grpc/support/atm.h> +#include <grpc/support/log.h> +#include <grpc/support/string_util.h> +#include <grpc/support/time.h> +#include <grpcpp/channel.h> +#include <grpcpp/client_context.h> +#include <grpcpp/create_channel.h> +#include <grpcpp/health_check_service_interface.h> +#include <grpcpp/server.h> +#include <grpcpp/server_builder.h> +#include <gtest/gtest.h> + +#include "src/core/lib/backoff/backoff.h" +#include "src/core/lib/gpr/env.h" + +#include "src/proto/grpc/testing/echo.grpc.pb.h" +#include "test/core/util/debugger_macros.h" +#include "test/core/util/port.h" +#include "test/core/util/test_config.h" +#include "test/cpp/end2end/test_service_impl.h" +#include "test/cpp/util/test_credentials_provider.h" + +#ifdef GRPC_CFSTREAM +using grpc::ClientAsyncResponseReader; +using grpc::testing::EchoRequest; +using grpc::testing::EchoResponse; +using grpc::testing::RequestParams; +using std::chrono::system_clock; + +namespace grpc { +namespace testing { +namespace { + +struct TestScenario { + TestScenario(const TString& creds_type, const TString& content) + : credentials_type(creds_type), message_content(content) {} + const TString credentials_type; + const TString message_content; +}; + +class CFStreamTest : public ::testing::TestWithParam<TestScenario> { + protected: + CFStreamTest() + : server_host_("grpctest"), + interface_("lo0"), + ipv4_address_("10.0.0.1") {} + + void DNSUp() { + std::ostringstream cmd; + // Add DNS entry for server_host_ in /etc/hosts + cmd << "echo '" << ipv4_address_ << " " << server_host_ + << " ' | sudo tee -a /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void DNSDown() { + std::ostringstream cmd; + // Remove DNS entry for server_host_ in /etc/hosts + cmd << "sudo sed -i '.bak' '/" << server_host_ << "/d' /etc/hosts"; + std::system(cmd.str().c_str()); + } + + void InterfaceUp() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void InterfaceDown() { + std::ostringstream cmd; + cmd << "sudo /sbin/ifconfig " << interface_ << " -alias " << ipv4_address_; + std::system(cmd.str().c_str()); + } + + void NetworkUp() { + gpr_log(GPR_DEBUG, "Bringing network up"); + InterfaceUp(); + DNSUp(); + } + + void NetworkDown() { + gpr_log(GPR_DEBUG, "Bringing network down"); + InterfaceDown(); + DNSDown(); + } + + void SetUp() override { + NetworkUp(); + grpc_init(); + StartServer(); + } + + void TearDown() override { + NetworkDown(); + StopServer(); + grpc_shutdown(); + } + + void StartServer() { + port_ = grpc_pick_unused_port_or_die(); + server_.reset(new ServerData(port_, GetParam().credentials_type)); + server_->Start(server_host_); + } + void StopServer() { server_->Shutdown(); } + + std::unique_ptr<grpc::testing::EchoTestService::Stub> BuildStub( + const std::shared_ptr<Channel>& channel) { + return grpc::testing::EchoTestService::NewStub(channel); + } + + std::shared_ptr<Channel> BuildChannel() { + std::ostringstream server_address; + server_address << server_host_ << ":" << port_; + ChannelArguments args; + auto channel_creds = GetCredentialsProvider()->GetChannelCredentials( + GetParam().credentials_type, &args); + return CreateCustomChannel(server_address.str(), channel_creds, args); + } + + int GetStreamID(ClientContext& context) { + int stream_id = 0; + grpc_call* call = context.c_call(); + if (call) { + grpc_chttp2_stream* stream = grpc_chttp2_stream_from_call(call); + if (stream) { + stream_id = stream->id; + } + } + return stream_id; + } + + void SendRpc( + const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub, + bool expect_success = false) { + auto response = std::unique_ptr<EchoResponse>(new EchoResponse()); + EchoRequest request; + auto& msg = GetParam().message_content; + request.set_message(msg); + ClientContext context; + Status status = stub->Echo(&context, request, response.get()); + int stream_id = GetStreamID(context); + if (status.ok()) { + gpr_log(GPR_DEBUG, "RPC with stream_id %d succeeded", stream_id); + EXPECT_EQ(msg, response->message()); + } else { + gpr_log(GPR_DEBUG, "RPC with stream_id %d failed: %s", stream_id, + status.error_message().c_str()); + } + if (expect_success) { + EXPECT_TRUE(status.ok()); + } + } + void SendAsyncRpc( + const std::unique_ptr<grpc::testing::EchoTestService::Stub>& stub, + RequestParams param = RequestParams()) { + EchoRequest request; + request.set_message(GetParam().message_content); + *request.mutable_param() = std::move(param); + AsyncClientCall* call = new AsyncClientCall; + + call->response_reader = + stub->PrepareAsyncEcho(&call->context, request, &cq_); + + call->response_reader->StartCall(); + call->response_reader->Finish(&call->reply, &call->status, (void*)call); + } + + void ShutdownCQ() { cq_.Shutdown(); } + + bool CQNext(void** tag, bool* ok) { + auto deadline = std::chrono::system_clock::now() + std::chrono::seconds(10); + auto ret = cq_.AsyncNext(tag, ok, deadline); + if (ret == grpc::CompletionQueue::GOT_EVENT) { + return true; + } else if (ret == grpc::CompletionQueue::SHUTDOWN) { + return false; + } else { + GPR_ASSERT(ret == grpc::CompletionQueue::TIMEOUT); + // This can happen if we hit the Apple CFStream bug which results in the + // read stream hanging. We are ignoring hangs and timeouts, but these + // tests are still useful as they can catch memory memory corruptions, + // crashes and other bugs that don't result in test hang/timeout. + return false; + } + } + + bool WaitForChannelNotReady(Channel* channel, int timeout_seconds = 5) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(false /* try_to_connect */)) == + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + bool WaitForChannelReady(Channel* channel, int timeout_seconds = 10) { + const gpr_timespec deadline = + grpc_timeout_seconds_to_deadline(timeout_seconds); + grpc_connectivity_state state; + while ((state = channel->GetState(true /* try_to_connect */)) != + GRPC_CHANNEL_READY) { + if (!channel->WaitForStateChange(state, deadline)) return false; + } + return true; + } + + struct AsyncClientCall { + EchoResponse reply; + ClientContext context; + Status status; + std::unique_ptr<ClientAsyncResponseReader<EchoResponse>> response_reader; + }; + + private: + struct ServerData { + int port_; + const TString creds_; + std::unique_ptr<Server> server_; + TestServiceImpl service_; + std::unique_ptr<std::thread> thread_; + bool server_ready_ = false; + + ServerData(int port, const TString& creds) + : port_(port), creds_(creds) {} + + void Start(const TString& server_host) { + gpr_log(GPR_INFO, "starting server on port %d", port_); + std::mutex mu; + std::unique_lock<std::mutex> lock(mu); + std::condition_variable cond; + thread_.reset(new std::thread( + std::bind(&ServerData::Serve, this, server_host, &mu, &cond))); + cond.wait(lock, [this] { return server_ready_; }); + server_ready_ = false; + gpr_log(GPR_INFO, "server startup complete"); + } + + void Serve(const TString& server_host, std::mutex* mu, + std::condition_variable* cond) { + std::ostringstream server_address; + server_address << server_host << ":" << port_; + ServerBuilder builder; + auto server_creds = + GetCredentialsProvider()->GetServerCredentials(creds_); + builder.AddListeningPort(server_address.str(), server_creds); + builder.RegisterService(&service_); + server_ = builder.BuildAndStart(); + std::lock_guard<std::mutex> lock(*mu); + server_ready_ = true; + cond->notify_one(); + } + + void Shutdown(bool join = true) { + server_->Shutdown(grpc_timeout_milliseconds_to_deadline(0)); + if (join) thread_->join(); + } + }; + + CompletionQueue cq_; + const TString server_host_; + const TString interface_; + const TString ipv4_address_; + std::unique_ptr<ServerData> server_; + int port_; +}; + +std::vector<TestScenario> CreateTestScenarios() { + std::vector<TestScenario> scenarios; + std::vector<TString> credentials_types; + std::vector<TString> messages; + + credentials_types.push_back(kInsecureCredentialsType); + auto sec_list = GetCredentialsProvider()->GetSecureCredentialsTypeList(); + for (auto sec = sec_list.begin(); sec != sec_list.end(); sec++) { + credentials_types.push_back(*sec); + } + + messages.push_back("🖖"); + for (size_t k = 1; k < GRPC_DEFAULT_MAX_RECV_MESSAGE_LENGTH / 1024; k *= 32) { + TString big_msg; + for (size_t i = 0; i < k * 1024; ++i) { + char c = 'a' + (i % 26); + big_msg += c; + } + messages.push_back(big_msg); + } + for (auto cred = credentials_types.begin(); cred != credentials_types.end(); + ++cred) { + for (auto msg = messages.begin(); msg != messages.end(); msg++) { + scenarios.emplace_back(*cred, *msg); + } + } + + return scenarios; +} + +INSTANTIATE_TEST_SUITE_P(CFStreamTest, CFStreamTest, + ::testing::ValuesIn(CreateTestScenarios())); + +// gRPC should automatically detech network flaps (without enabling keepalives) +// when CFStream is enabled +TEST_P(CFStreamTest, NetworkTransition) { + auto channel = BuildChannel(); + auto stub = BuildStub(channel); + // Channel should be in READY state after we send an RPC + SendRpc(stub, /*expect_success=*/true); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + + std::atomic_bool shutdown{false}; + std::thread sender = std::thread([this, &stub, &shutdown]() { + while (true) { + if (shutdown.load()) { + return; + } + SendRpc(stub); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + }); + + // bring down network + NetworkDown(); + + // network going down should be detected by cfstream + EXPECT_TRUE(WaitForChannelNotReady(channel.get())); + + // bring network interface back up + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + NetworkUp(); + + // channel should reconnect + EXPECT_TRUE(WaitForChannelReady(channel.get())); + EXPECT_EQ(channel->GetState(false), GRPC_CHANNEL_READY); + shutdown.store(true); + sender.join(); +} + +// Network flaps while RPCs are in flight +TEST_P(CFStreamTest, NetworkFlapRpcsInFlight) { + auto channel = BuildChannel(); + auto stub = BuildStub(channel); + std::atomic_int rpcs_sent{0}; + + // Channel should be in READY state after we send some RPCs + for (int i = 0; i < 10; ++i) { + RequestParams param; + param.set_skip_cancelled_check(true); + SendAsyncRpc(stub, param); + ++rpcs_sent; + } + EXPECT_TRUE(WaitForChannelReady(channel.get())); + + // Bring down the network + NetworkDown(); + + std::thread thd = std::thread([this, &rpcs_sent]() { + void* got_tag; + bool ok = false; + bool network_down = true; + int total_completions = 0; + + while (CQNext(&got_tag, &ok)) { + ++total_completions; + GPR_ASSERT(ok); + AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); + int stream_id = GetStreamID(call->context); + if (!call->status.ok()) { + gpr_log(GPR_DEBUG, "RPC with stream_id %d failed with error: %s", + stream_id, call->status.error_message().c_str()); + // Bring network up when RPCs start failing + if (network_down) { + NetworkUp(); + network_down = false; + } + } else { + gpr_log(GPR_DEBUG, "RPC with stream_id %d succeeded", stream_id); + } + delete call; + } + // Remove line below and uncomment the following line after Apple CFStream + // bug has been fixed. + (void)rpcs_sent; + // EXPECT_EQ(total_completions, rpcs_sent); + }); + + for (int i = 0; i < 100; ++i) { + RequestParams param; + param.set_skip_cancelled_check(true); + SendAsyncRpc(stub, param); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + ++rpcs_sent; + } + + ShutdownCQ(); + + thd.join(); +} + +// Send a bunch of RPCs, some of which are expected to fail. +// We should get back a response for all RPCs +TEST_P(CFStreamTest, ConcurrentRpc) { + auto channel = BuildChannel(); + auto stub = BuildStub(channel); + std::atomic_int rpcs_sent{0}; + std::thread thd = std::thread([this, &rpcs_sent]() { + void* got_tag; + bool ok = false; + int total_completions = 0; + + while (CQNext(&got_tag, &ok)) { + ++total_completions; + GPR_ASSERT(ok); + AsyncClientCall* call = static_cast<AsyncClientCall*>(got_tag); + int stream_id = GetStreamID(call->context); + if (!call->status.ok()) { + gpr_log(GPR_DEBUG, "RPC with stream_id %d failed with error: %s", + stream_id, call->status.error_message().c_str()); + // Bring network up when RPCs start failing + } else { + gpr_log(GPR_DEBUG, "RPC with stream_id %d succeeded", stream_id); + } + delete call; + } + // Remove line below and uncomment the following line after Apple CFStream + // bug has been fixed. + (void)rpcs_sent; + // EXPECT_EQ(total_completions, rpcs_sent); + }); + + for (int i = 0; i < 10; ++i) { + if (i % 3 == 0) { + RequestParams param; + ErrorStatus* error = param.mutable_expected_error(); + error->set_code(StatusCode::INTERNAL); + error->set_error_message("internal error"); + SendAsyncRpc(stub, param); + } else if (i % 5 == 0) { + RequestParams param; + param.set_echo_metadata(true); + DebugInfo* info = param.mutable_debug_info(); + info->add_stack_entries("stack_entry1"); + info->add_stack_entries("stack_entry2"); + info->set_detail("detailed debug info"); + SendAsyncRpc(stub, param); + } else { + SendAsyncRpc(stub); + } + ++rpcs_sent; + } + + ShutdownCQ(); + + thd.join(); +} + +} // namespace +} // namespace testing +} // namespace grpc +#endif // GRPC_CFSTREAM + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + grpc::testing::TestEnvironment env(argc, argv); + gpr_setenv("grpc_cfstream", "1"); + const auto result = RUN_ALL_TESTS(); + return result; +} |