diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-04-25 16:32:29 +0300 |
---|---|---|
committer | alexnick <alexnick@yandex-team.ru> | 2022-04-25 16:32:29 +0300 |
commit | 41bd13f29545710870e5768702169a659a699303 (patch) | |
tree | 75fe50d699a166310de6196ad46d545166e4043d | |
parent | cd2bde506787db5f4ce46b67da7f06b3d6f2350b (diff) | |
download | ydb-41bd13f29545710870e5768702169a659a699303.tar.gz |
fix for exceptions inside credential provider KIKIMR-14787
ref:98c94d1198056538e41968cf0ef4631e1ca9f10b
3 files changed, 95 insertions, 8 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp index 1336fc37256..5e05fa1f192 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp @@ -178,13 +178,12 @@ void TReadSession::ProceedWithoutClusterDiscovery() { TClusterSessionInfo& clusterSessionInfo = clusterSessionInfoIter->second; clusterSessionInfo.ClusterEndpoint = DbDriverState->DiscoveryEndpoint; clusterSessionInfo.Topics = Settings.Topics_; - CreateClusterSessionsImpl(); + CreateClusterSessionsImpl(deferred); } ScheduleDumpCountersToLog(); } -void TReadSession::CreateClusterSessionsImpl() { - TDeferredActions deferred; +void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) { // Create cluster sessions. ui64 partitionStreamIdStart = 1; const size_t clusterSessionsCount = ClusterSessions.size(); @@ -217,7 +216,8 @@ void TReadSession::CreateClusterSessionsImpl() { ErrorHandler, context, partitionStreamIdStart++, clusterSessionsCount); - clusterSessionInfo.Session->Start(); + + deferred.DeferStartSession(clusterSessionInfo.Session); } } @@ -290,7 +290,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu << normalizedName); } auto fullEndpoint = ApplyClusterEndpoint(DbDriverState->DiscoveryEndpoint, cluster.endpoint()); - if (clusterSessionInfo.ClusterEndpoint && clusterSessionInfo.ClusterEndpoint != fullEndpoint) { + if (clusterSessionInfo.ClusterEndpoint && clusterSessionInfo.ClusterEndpoint != fullEndpoint) { issues.AddIssue(TStringBuilder() << "Unexpected reply from cluster discovery. Different endpoints for one cluster name. Cluster: " << normalizedName << ". \"" << clusterSessionInfo.ClusterEndpoint << "\" vs \"" << fullEndpoint << "\""); @@ -317,7 +317,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu return; } - CreateClusterSessionsImpl(); + CreateClusterSessionsImpl(deferred); } ScheduleDumpCountersToLog(); } @@ -776,7 +776,7 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) { auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) { if (auto sharedThis = weakThis.lock()) { - sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); + sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); //OnConnect could be called inplace! } }; @@ -2296,6 +2296,10 @@ void TDeferredActions::DeferReconnection(std::shared_ptr<TSingleClusterReadSessi ReconnectionStatus = std::move(status); } +void TDeferredActions::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session) { + Sessions.push_back(std::move(session)); +} + void TDeferredActions::DeferSignalWaiter(TWaiter&& waiter) { Waiters.emplace_back(std::move(waiter)); } @@ -2306,8 +2310,16 @@ void TDeferredActions::DoActions() { AbortSession(); Reconnect(); SignalWaiters(); + StartSessions(); } +void TDeferredActions::StartSessions() { + for (auto& session : Sessions) { + session->Start(); + } +} + + void TDeferredActions::Read() { if (ReadDst) { Y_ASSERT(Processor); diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h index 8e96833eb0a..3317cd28eb3 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h @@ -62,8 +62,10 @@ public: void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, const TString& message); void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status); void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl> session, const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status); + void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session); void DeferSignalWaiter(TWaiter&& waiter); + private: void DoActions(); @@ -72,6 +74,7 @@ private: void AbortSession(); void Reconnect(); void SignalWaiters(); + void StartSessions(); private: // Read. @@ -92,6 +95,10 @@ private: // Reconnection. std::shared_ptr<TSingleClusterReadSessionImpl> Session; TPlainStatus ReconnectionStatus; + + // Session to start + std::vector<std::shared_ptr<TSingleClusterReadSessionImpl>> Sessions; + }; class TDataDecompressionInfo { @@ -1085,7 +1092,7 @@ private: void OnClusterDiscovery(const TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result); void ProceedWithoutClusterDiscovery(); void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred); - void CreateClusterSessionsImpl(); + void CreateClusterSessionsImpl(TDeferredActions& deferred); // Shutdown. diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp index c021a6b7c4f..65440859403 100644 --- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp +++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp @@ -411,5 +411,73 @@ Y_UNIT_TEST_SUITE(BasicUsage) { } writer->Close(); } + + class TBrokenCredentialsProvider : public ICredentialsProvider { + public: + TBrokenCredentialsProvider() {} + virtual ~TBrokenCredentialsProvider() {} + TStringType GetAuthInfo() const { + ythrow yexception() << "exception during creation"; + return ""; + } + bool IsValid() const { return true; } + }; + + class TBrokenCredentialsProviderFactory : public ICredentialsProviderFactory { + public: + TBrokenCredentialsProviderFactory() {} + + virtual ~TBrokenCredentialsProviderFactory() {} + virtual TCredentialsProviderPtr CreateProvider() const { + return std::make_shared<TBrokenCredentialsProvider>(); + } + + virtual TStringType GetClientIdentity() const { + return "abacaba"; + } + }; + + Y_UNIT_TEST(BrokenCredentialsProvider) { + + std::shared_ptr<ICredentialsProviderFactory> brokenCredentialsProviderFactory; + brokenCredentialsProviderFactory.reset(new TBrokenCredentialsProviderFactory{}); + auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME); + auto client = TPersQueueClient(setup->GetDriver(), TPersQueueClientSettings().CredentialsProviderFactory(brokenCredentialsProviderFactory)); + auto settings = setup->GetReadSessionSettings(); + settings.DisableClusterDiscovery(true) + .RetryPolicy(IRetryPolicy::GetNoRetryPolicy()); + std::shared_ptr<IReadSession> readSession = client.CreateReadSession(settings); + + Cerr << "Get event on client\n"; + auto event = *readSession->GetEvent(true); + std::visit(TOverloaded { + [&](TReadSessionEvent::TDataReceivedEvent&) { + UNIT_ASSERT(false); + }, + [&](TReadSessionEvent::TCommitAcknowledgementEvent&) { + UNIT_ASSERT(false); + }, + [&](TReadSessionEvent::TCreatePartitionStreamEvent& event) { + UNIT_ASSERT(false); + event.Confirm(); + }, + [&](TReadSessionEvent::TDestroyPartitionStreamEvent& event) { + UNIT_ASSERT(false); + event.Confirm(); + }, + [&](TReadSessionEvent::TPartitionStreamStatusEvent&) { + UNIT_FAIL("Test does not support lock sessions yet"); + }, + [&](TReadSessionEvent::TPartitionStreamClosedEvent&) { + UNIT_FAIL("Test does not support lock sessions yet"); + }, + [&](TSessionClosedEvent& event) { + Cerr << "Got close event: " << event.DebugString(); + } + + }, event); + + } + } } // namespace NYdb::NPersQueue::NTests |