aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-04-25 16:32:29 +0300
committeralexnick <alexnick@yandex-team.ru>2022-04-25 16:32:29 +0300
commit41bd13f29545710870e5768702169a659a699303 (patch)
tree75fe50d699a166310de6196ad46d545166e4043d
parentcd2bde506787db5f4ce46b67da7f06b3d6f2350b (diff)
downloadydb-41bd13f29545710870e5768702169a659a699303.tar.gz
fix for exceptions inside credential provider KIKIMR-14787
ref:98c94d1198056538e41968cf0ef4631e1ca9f10b
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp26
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h9
-rw-r--r--ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp68
3 files changed, 95 insertions, 8 deletions
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
index 1336fc37256..5e05fa1f192 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.cpp
@@ -178,13 +178,12 @@ void TReadSession::ProceedWithoutClusterDiscovery() {
TClusterSessionInfo& clusterSessionInfo = clusterSessionInfoIter->second;
clusterSessionInfo.ClusterEndpoint = DbDriverState->DiscoveryEndpoint;
clusterSessionInfo.Topics = Settings.Topics_;
- CreateClusterSessionsImpl();
+ CreateClusterSessionsImpl(deferred);
}
ScheduleDumpCountersToLog();
}
-void TReadSession::CreateClusterSessionsImpl() {
- TDeferredActions deferred;
+void TReadSession::CreateClusterSessionsImpl(TDeferredActions& deferred) {
// Create cluster sessions.
ui64 partitionStreamIdStart = 1;
const size_t clusterSessionsCount = ClusterSessions.size();
@@ -217,7 +216,8 @@ void TReadSession::CreateClusterSessionsImpl() {
ErrorHandler,
context,
partitionStreamIdStart++, clusterSessionsCount);
- clusterSessionInfo.Session->Start();
+
+ deferred.DeferStartSession(clusterSessionInfo.Session);
}
}
@@ -290,7 +290,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
<< normalizedName);
}
auto fullEndpoint = ApplyClusterEndpoint(DbDriverState->DiscoveryEndpoint, cluster.endpoint());
- if (clusterSessionInfo.ClusterEndpoint && clusterSessionInfo.ClusterEndpoint != fullEndpoint) {
+ if (clusterSessionInfo.ClusterEndpoint && clusterSessionInfo.ClusterEndpoint != fullEndpoint) {
issues.AddIssue(TStringBuilder() << "Unexpected reply from cluster discovery. Different endpoints for one cluster name. Cluster: "
<< normalizedName << ". \"" << clusterSessionInfo.ClusterEndpoint << "\" vs \""
<< fullEndpoint << "\"");
@@ -317,7 +317,7 @@ void TReadSession::OnClusterDiscovery(const TStatus& status, const Ydb::PersQueu
return;
}
- CreateClusterSessionsImpl();
+ CreateClusterSessionsImpl(deferred);
}
ScheduleDumpCountersToLog();
}
@@ -776,7 +776,7 @@ bool TSingleClusterReadSessionImpl::Reconnect(const TPlainStatus& status) {
auto connectCallback = [weakThis = weak_from_this(), connectContext = connectContext](TPlainStatus&& st, typename IProcessor::TPtr&& processor) {
if (auto sharedThis = weakThis.lock()) {
- sharedThis->OnConnect(std::move(st), std::move(processor), connectContext);
+ sharedThis->OnConnect(std::move(st), std::move(processor), connectContext); //OnConnect could be called inplace!
}
};
@@ -2296,6 +2296,10 @@ void TDeferredActions::DeferReconnection(std::shared_ptr<TSingleClusterReadSessi
ReconnectionStatus = std::move(status);
}
+void TDeferredActions::DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session) {
+ Sessions.push_back(std::move(session));
+}
+
void TDeferredActions::DeferSignalWaiter(TWaiter&& waiter) {
Waiters.emplace_back(std::move(waiter));
}
@@ -2306,8 +2310,16 @@ void TDeferredActions::DoActions() {
AbortSession();
Reconnect();
SignalWaiters();
+ StartSessions();
}
+void TDeferredActions::StartSessions() {
+ for (auto& session : Sessions) {
+ session->Start();
+ }
+}
+
+
void TDeferredActions::Read() {
if (ReadDst) {
Y_ASSERT(Processor);
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
index 8e96833eb0a..3317cd28eb3 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.h
@@ -62,8 +62,10 @@ public:
void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, EStatus statusCode, const TString& message);
void DeferAbortSession(const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status);
void DeferReconnection(std::shared_ptr<TSingleClusterReadSessionImpl> session, const IErrorHandler::TPtr& errorHandler, TPlainStatus&& status);
+ void DeferStartSession(std::shared_ptr<TSingleClusterReadSessionImpl> session);
void DeferSignalWaiter(TWaiter&& waiter);
+
private:
void DoActions();
@@ -72,6 +74,7 @@ private:
void AbortSession();
void Reconnect();
void SignalWaiters();
+ void StartSessions();
private:
// Read.
@@ -92,6 +95,10 @@ private:
// Reconnection.
std::shared_ptr<TSingleClusterReadSessionImpl> Session;
TPlainStatus ReconnectionStatus;
+
+ // Session to start
+ std::vector<std::shared_ptr<TSingleClusterReadSessionImpl>> Sessions;
+
};
class TDataDecompressionInfo {
@@ -1085,7 +1092,7 @@ private:
void OnClusterDiscovery(const TStatus& status, const Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResult& result);
void ProceedWithoutClusterDiscovery();
void RestartClusterDiscoveryImpl(TDuration delay, TDeferredActions& deferred);
- void CreateClusterSessionsImpl();
+ void CreateClusterSessionsImpl(TDeferredActions& deferred);
// Shutdown.
diff --git a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
index c021a6b7c4f..65440859403 100644
--- a/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_persqueue_core/ut/basic_usage_ut.cpp
@@ -411,5 +411,73 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
}
writer->Close();
}
+
+ class TBrokenCredentialsProvider : public ICredentialsProvider {
+ public:
+ TBrokenCredentialsProvider() {}
+ virtual ~TBrokenCredentialsProvider() {}
+ TStringType GetAuthInfo() const {
+ ythrow yexception() << "exception during creation";
+ return "";
+ }
+ bool IsValid() const { return true; }
+ };
+
+ class TBrokenCredentialsProviderFactory : public ICredentialsProviderFactory {
+ public:
+ TBrokenCredentialsProviderFactory() {}
+
+ virtual ~TBrokenCredentialsProviderFactory() {}
+ virtual TCredentialsProviderPtr CreateProvider() const {
+ return std::make_shared<TBrokenCredentialsProvider>();
+ }
+
+ virtual TStringType GetClientIdentity() const {
+ return "abacaba";
+ }
+ };
+
+ Y_UNIT_TEST(BrokenCredentialsProvider) {
+
+ std::shared_ptr<ICredentialsProviderFactory> brokenCredentialsProviderFactory;
+ brokenCredentialsProviderFactory.reset(new TBrokenCredentialsProviderFactory{});
+ auto setup = std::make_shared<TPersQueueYdbSdkTestSetup>(TEST_CASE_NAME);
+ auto client = TPersQueueClient(setup->GetDriver(), TPersQueueClientSettings().CredentialsProviderFactory(brokenCredentialsProviderFactory));
+ auto settings = setup->GetReadSessionSettings();
+ settings.DisableClusterDiscovery(true)
+ .RetryPolicy(IRetryPolicy::GetNoRetryPolicy());
+ std::shared_ptr<IReadSession> readSession = client.CreateReadSession(settings);
+
+ Cerr << "Get event on client\n";
+ auto event = *readSession->GetEvent(true);
+ std::visit(TOverloaded {
+ [&](TReadSessionEvent::TDataReceivedEvent&) {
+ UNIT_ASSERT(false);
+ },
+ [&](TReadSessionEvent::TCommitAcknowledgementEvent&) {
+ UNIT_ASSERT(false);
+ },
+ [&](TReadSessionEvent::TCreatePartitionStreamEvent& event) {
+ UNIT_ASSERT(false);
+ event.Confirm();
+ },
+ [&](TReadSessionEvent::TDestroyPartitionStreamEvent& event) {
+ UNIT_ASSERT(false);
+ event.Confirm();
+ },
+ [&](TReadSessionEvent::TPartitionStreamStatusEvent&) {
+ UNIT_FAIL("Test does not support lock sessions yet");
+ },
+ [&](TReadSessionEvent::TPartitionStreamClosedEvent&) {
+ UNIT_FAIL("Test does not support lock sessions yet");
+ },
+ [&](TSessionClosedEvent& event) {
+ Cerr << "Got close event: " << event.DebugString();
+ }
+
+ }, event);
+
+ }
+
}
} // namespace NYdb::NPersQueue::NTests