aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorabcdef <akotov@ydb.tech>2023-06-02 15:50:36 +0300
committerabcdef <akotov@ydb.tech>2023-06-02 15:50:36 +0300
commit045235991e9b5259c78c98e94d8c0d4d16849a1f (patch)
tree15a884c2fa1cf95f9a48fc9764c52b852aa23732
parent59e0045a61e61c2ac38878f2adc7ec91ca914cc1 (diff)
downloadydb-045235991e9b5259c78c98e94d8c0d4d16849a1f.tar.gz
fixed the TPersQueueTest::PreferredCluster_TwoEnabledClustersAndWriteSessionsWithDifferentPreferredCluster_SessionWithMismatchedClusterDiesAndOthersAlive test
Сессия записи может закрываться до окончания инициализации. В результате клиент в первом сообщении вместо `InitResponse` может получить ошибку с кодом `PREFERRED_CLUSTER_MISMATCHED`. Эта особенность учитывается в тесте
-rw-r--r--ydb/services/persqueue_v1/persqueue_ut.cpp6
-rw-r--r--ydb/services/persqueue_v1/ut/api_test_setup.h40
-rw-r--r--ydb/services/persqueue_v1/ut/test_utils.h25
3 files changed, 63 insertions, 8 deletions
diff --git a/ydb/services/persqueue_v1/persqueue_ut.cpp b/ydb/services/persqueue_v1/persqueue_ut.cpp
index 5e9d2a6cc89..1cbdd25bb5c 100644
--- a/ydb/services/persqueue_v1/persqueue_ut.cpp
+++ b/ydb/services/persqueue_v1/persqueue_ut.cpp
@@ -4153,14 +4153,16 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
setup.GetPQConfig().SetClustersUpdateTimeoutSec(0);
setup.GetPQConfig().SetRemoteClusterEnabledDelaySec(0);
setup.GetPQConfig().SetCloseClientSessionWithEnabledRemotePreferredClusterDelaySec(0);
+
auto sessionWithNoPreferredCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(TString()));
auto sessionWithLocalPreffedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetLocalCluster()));
- auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()));
+ auto sessionWithRemotePrefferedCluster = setup.InitWriteSession(GenerateSessionSetupWithPreferredCluster(setup.GetRemoteCluster()), true);
+
grpc::ClientContext context;
auto sessionWithNoInitialization = setup.GetPersQueueService()->StreamingWrite(&context);
log << TLOG_INFO << "Wait for session with remote preferred cluster to die";
- AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.first, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED);
+ AssertStreamingSessionDead(sessionWithRemotePrefferedCluster.Stream, Ydb::StatusIds::ABORTED, Ydb::PersQueue::ErrorCode::PREFERRED_CLUSTER_MISMATCHED, sessionWithRemotePrefferedCluster.FirstMessage);
AssertStreamingSessionAlive(sessionWithNoPreferredCluster.first);
AssertStreamingSessionAlive(sessionWithLocalPreffedCluster.first);
diff --git a/ydb/services/persqueue_v1/ut/api_test_setup.h b/ydb/services/persqueue_v1/ut/api_test_setup.h
index 8d5d82b33d2..e0cde5f3f93 100644
--- a/ydb/services/persqueue_v1/ut/api_test_setup.h
+++ b/ydb/services/persqueue_v1/ut/api_test_setup.h
@@ -87,10 +87,8 @@ public:
std::pair<std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>, std::unique_ptr<grpc::ClientContext>> InitWriteSession(
const typename TStreamingWriteClientMessage::InitRequest& setup = TStreamingWriteClientMessage::InitRequest())
{
- auto context = std::make_unique<grpc::ClientContext>();
- auto stream = service->StreamingWrite(context.get());
- InitSession(stream, setup);
- return std::make_pair(std::move(stream), std::move(context));
+ auto result = InitWriteSession(setup, false);
+ return {std::move(result.Stream), std::move(result.Context)};
}
// Initializes session with default (possible overwriten by setup parameter) values and returns initialization response
@@ -98,6 +96,32 @@ public:
TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
const typename TClientMessage::InitRequest& setup = typename TClientMessage::InitRequest())
{
+ return InitSession(stream, setup, false);
+ }
+
+ using TClientContextPtr = std::unique_ptr<grpc::ClientContext>;
+ using TStreamPtr = std::unique_ptr<grpc::ClientReaderWriter<TStreamingWriteClientMessage, TStreamingWriteServerMessage>>;
+
+ struct TInitWriteSessionResult {
+ TClientContextPtr Context;
+ TStreamPtr Stream;
+ TStreamingWriteServerMessage FirstMessage;
+ };
+
+ TInitWriteSessionResult InitWriteSession(const typename TStreamingWriteClientMessage::InitRequest& setup,
+ bool mayBeAborted)
+ {
+ auto context = std::make_unique<grpc::ClientContext>();
+ auto stream = service->StreamingWrite(context.get());
+ auto message = InitSession(stream, setup, mayBeAborted);
+ return {std::move(context), std::move(stream), std::move(message)};
+ }
+
+ template<typename TClientMessage, typename TServerMessage>
+ TServerMessage InitSession(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
+ const typename TClientMessage::InitRequest& setup,
+ bool mayBeAborted)
+ {
TClientMessage clientMessage;
// TODO: Replace with MergeFrom?
clientMessage.mutable_init_request()->CopyFrom(setup);
@@ -113,8 +137,12 @@ public:
Log << TLOG_INFO << "Wait for \"init_response\"";
AssertSuccessfullStreamingOperation(stream->Read(&serverMessage), stream);
Cerr << "Init response: " << serverMessage.ShortDebugString() << Endl;
- UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage);
- Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote();
+
+ if ((serverMessage.status() != Ydb::StatusIds::ABORTED) || !mayBeAborted) {
+ UNIT_ASSERT_C(serverMessage.server_message_case() == TServerMessage::kInitResponse, serverMessage);
+ Log << TLOG_INFO << "Session ID is " << serverMessage.init_response().session_id().Quote();
+ }
+
return serverMessage;
}
};
diff --git a/ydb/services/persqueue_v1/ut/test_utils.h b/ydb/services/persqueue_v1/ut/test_utils.h
index 915e7906b34..af6d2d22c57 100644
--- a/ydb/services/persqueue_v1/ut/test_utils.h
+++ b/ydb/services/persqueue_v1/ut/test_utils.h
@@ -81,3 +81,28 @@ void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClient
UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage);
}
+template<typename TClientMessage, typename TServerMessage>
+void AssertStreamingSessionDead(std::unique_ptr<grpc::ClientReaderWriter<TClientMessage, TServerMessage>>& stream,
+ const Ydb::StatusIds::StatusCode expectedStatus, const Ydb::PersQueue::ErrorCode::ErrorCode expectedErrorCode,
+ const TServerMessage& firstMessage)
+{
+ auto ensureExpectedError = [&](const TServerMessage& serverMessage) {
+ UNIT_ASSERT_LE(1, serverMessage.issues_size());
+ // TODO: Why namespace duplicates enum name "ErrorCode::ErrorCode"?
+ // TODO: Why "Ydb::PersQueue::ErrorCode::ErrorCode" doesn't work with streaming output like "Ydb::StatusIds::StatusCode" does?
+ auto actualErrorCode = static_cast<Ydb::PersQueue::ErrorCode::ErrorCode>(serverMessage.issues(0).issue_code());
+ UNIT_ASSERT_C(expectedErrorCode == actualErrorCode, serverMessage);
+ };
+
+ if (firstMessage.status() == expectedStatus) {
+ ensureExpectedError(firstMessage);
+ return;
+ }
+
+ TServerMessage serverMessage;
+ auto res = stream->Read(&serverMessage);
+ Cerr << serverMessage.DebugString() << "\n";
+ AssertSuccessfullStreamingOperation(res, stream);
+ UNIT_ASSERT_VALUES_EQUAL(expectedStatus, serverMessage.status());
+ ensureExpectedError(serverMessage);
+}