diff options
author | abcdef <akotov@ydb.tech> | 2023-06-02 15:50:36 +0300 |
---|---|---|
committer | abcdef <akotov@ydb.tech> | 2023-06-02 15:50:36 +0300 |
commit | 045235991e9b5259c78c98e94d8c0d4d16849a1f (patch) | |
tree | 15a884c2fa1cf95f9a48fc9764c52b852aa23732 | |
parent | 59e0045a61e61c2ac38878f2adc7ec91ca914cc1 (diff) | |
download | ydb-045235991e9b5259c78c98e94d8c0d4d16849a1f.tar.gz |
fixed the TPersQueueTest::PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive test
Сессия записи может закрываться до окончания инициализации. В результате клиент в первом сообщении вместо `InitResponse` может получить ошибку с кодом `PREFERRED_CLUSTER_MISMATCHED`. Эта особенность учитывается в тесте
-rw-r--r-- | ydb/services/persqueue_v1/persqueue_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/api_test_setup.h | 40 | ||||
-rw-r--r-- | ydb/services/persqueue_v1/ut/test_utils.h | 25 |
3 files changed, 63 insertions, 8 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp index 5e9d2a6cc89..1cbdd25bb5c 100644 --- a/ydb/services/persqueue_v1/persqueue_ut.cpp +++ b/ydb/services/persqueue_v1/persqueue_ut.cpp @@ -4153,14 +4153,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) { setup.GetPQConfig().SetClustersUpdateTimeoutSec(0); setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0); setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0); + auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString())); auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster())); - auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster())); + auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()), true); + grpc::ClientContext context; auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context); log << TLOG_INFO << "Wait for session with remote preferred cluster to die"; - AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED); + AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.Stream, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED, sessionWithRemotePrefferedCluster.FirstMessage); AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first); AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first); diff --git a/ydb/services/persqueue_v1/ut/api_test_setup.h b/ydb/services/persqueue_v1/ut/api_test_setup.h index 8d5d82b33d2..e0cde5f3f93 100644 --- a/ydb/services/persqueue_v1/ut/api_test_setup.h +++ b/ydb/services/persqueue_v1/ut/api_test_setup.h @@ -87,10 +87,8 @@ public: std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession( const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest()) { - auto context = std::make_unique<grpc::ClientContext>(); - auto stream = service->StreamingWrite(context.get()); - InitSession(stream, setup); - return std::make_pair(std::move(stream), std::move(context)); + auto result = InitWriteSession(setup, false); + return {std::move(result.Stream), std::move(result.Context)}; } // Initializes session with default (possible overwriten by setup parameter) values and returns initialization response @@ -98,6 +96,32 @@ public: TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest()) { + return InitSession(stream, setup, false); + } + + using TClientContextPtr = std::unique_ptr<grpc::ClientContext>; + using TStreamPtr = std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>; + + struct TInitWriteSessionResult { + TClientContextPtr Context; + TStreamPtr Stream; + TStreamingWriteServerMessage FirstMessage; + }; + + TInitWriteSessionResult InitWriteSession(const typename TStreamingWriteClientMessage::InitRequest& setup, + bool mayBeAborted) + { + auto context = std::make_unique<grpc::ClientContext>(); + auto stream = service->StreamingWrite(context.get()); + auto message = InitSession(stream, setup, mayBeAborted); + return {std::move(context), std::move(stream), std::move(message)}; + } + + template<typename TClientMessage, typename TServerMessage> + TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, + const typename TClientMessage::InitRequest& setup, + bool mayBeAborted) + { TClientMessage clientMessage; // TODO: Replace with MergeFrom? clientMessage.mutable_init_request()->CopyFrom(setup); @@ -113,8 +137,12 @@ public: Log << TLOG_INFO << "Wait for \"init_response\""; AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream); Cerr << "Init response: " << serverMessage.ShortDebugString() << Endl; - UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage); - Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote(); + + if ((serverMessage.status() != Ydb::StatusIds::ABORTED) || !mayBeAborted) { + UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage); + Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote(); + } + return serverMessage; } }; diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h index 915e7906b34..af6d2d22c57 100644 --- a/ydb/services/persqueue_v1/ut/test_utils.h +++ b/ydb/services/persqueue_v1/ut/test_utils.h @@ -81,3 +81,28 @@ void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClient UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage); } +template<typename TClientMessage, typename TServerMessage> +void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream, + const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode, + const TServerMessage& firstMessage) +{ + auto ensureExpectedError = [&](const TServerMessage& serverMessage) { + UNIT_ASSERT_LE(1, serverMessage.issues_size()); + // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"? + // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does? + auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code()); + UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage); + }; + + if (firstMessage.status() == expectedStatus) { + ensureExpectedError(firstMessage); + return; + } + + TServerMessage serverMessage; + auto res = stream->Read(&serverMessage); + Cerr << serverMessage.DebugString() << "\n"; + AssertSuccessfullStreamingOperation(res, stream); + UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status()); + ensureExpectedError(serverMessage); +} |