aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexbogo <alexbogo@ydb.tech>2022-09-05 14:22:33 +0300
committeralexbogo <alexbogo@ydb.tech>2022-09-05 14:22:33 +0300
commit53dc0b3016008947b9f1dadf496716a5868bae7c (patch)
treebf02637986f604617b5d595010c11686204c7b02
parent7597a329b3256b41c92d36c943cf18dd0de845ea (diff)
downloadydb-53dc0b3016008947b9f1dadf496716a5868bae7c.tar.gz
[mirrorer] async creation of credential factory
init
-rw-r--r--ydb/core/persqueue/actor_persqueue_client_iface.h23
-rw-r--r--ydb/core/persqueue/events/internal.h18
-rw-r--r--ydb/core/persqueue/mirrorer.cpp68
-rw-r--r--ydb/core/persqueue/mirrorer.h6
4 files changed, 90 insertions, 25 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h
index 29478c4de0b..76c1a8545ac 100644
--- a/ydb/core/persqueue/actor_persqueue_client_iface.h
+++ b/ydb/core/persqueue/actor_persqueue_client_iface.h
@@ -34,9 +34,15 @@ public:
Driver = std::make_shared<NYdb::TDriver>(driverConfig);
}
- virtual std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider(
+ NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProvider(
const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred
- ) const = 0;
+ ) const noexcept {
+ try {
+ return GetCredentialsProviderImpl(cred);
+ } catch(...) {
+ return NThreading::MakeErrorFuture<NYdb::TCredentialsProviderFactoryPtr>(std::current_exception());
+ }
+ }
virtual std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession(
const NKikimrPQ::TMirrorPartitionConfig& config,
@@ -53,18 +59,24 @@ public:
}
protected:
+ virtual NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProviderImpl(
+ const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred
+ ) const = 0;
+
+
+protected:
mutable TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr;
mutable std::shared_ptr<NYdb::TDriver> Driver;
};
class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
-public:
- std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider(
+protected:
+ NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProviderImpl(
const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred
) const override {
switch (cred.GetCredentialsCase()) {
case NKikimrPQ::TMirrorPartitionConfig::TCredentials::CREDENTIALS_NOT_SET: {
- return NYdb::CreateInsecureCredentialsProviderFactory();
+ return NThreading::MakeFuture(NYdb::CreateInsecureCredentialsProviderFactory());
}
default: {
ythrow yexception() << "unsupported credentials type " << ui64(cred.GetCredentialsCase());
@@ -72,6 +84,7 @@ public:
}
}
+public:
std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession(
const NKikimrPQ::TMirrorPartitionConfig& config,
ui32 partition,
diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h
index c5a8d733e7e..ac4e6be45e6 100644
--- a/ydb/core/persqueue/events/internal.h
+++ b/ydb/core/persqueue/events/internal.h
@@ -12,6 +12,10 @@
#include <util/generic/maybe.h>
+namespace NYdb {
+ class ICredentialsProviderFactory;
+}
+
namespace NKikimr {
namespace NPQ {
@@ -109,6 +113,7 @@ struct TEvPQ {
EvReadLimiterCounters,
EvRetryWrite,
EvInitCredentials,
+ EvCredentialsCreated,
EvCreateConsumer,
EvRequestPartitionStatus,
EvReaderEventArrived,
@@ -607,6 +612,19 @@ struct TEvPQ {
TEvInitCredentials()
{}
};
+
+ struct TEvCredentialsCreated : public TEventLocal<TEvCredentialsCreated, EvCredentialsCreated> {
+ TEvCredentialsCreated(const TString& error)
+ : Error(error)
+ {}
+
+ TEvCredentialsCreated(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentials)
+ : Credentials(credentials)
+ {}
+
+ std::shared_ptr<NYdb::ICredentialsProviderFactory> Credentials;
+ std::optional<TString> Error;
+ };
struct TEvCreateConsumer : public TEventLocal<TEvCreateConsumer, EvCreateConsumer> {
TEvCreateConsumer()
diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp
index 021a48a5df9..e7cb23ce46b 100644
--- a/ydb/core/persqueue/mirrorer.cpp
+++ b/ydb/core/persqueue/mirrorer.cpp
@@ -242,7 +242,8 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte
LastStateLogTimestamp = ctx.Now();
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< "[STATE] current state=" << GetCurrentState()
- << ", read session=" << bool(ReadSession) << ", credentials provider=" << bool(CredentialsProvider));
+ << ", read session=" << bool(ReadSession) << ", credentials provider=" << bool(CredentialsProvider)
+ << ", credentials request inflight=" << CredentialsRequestInFlight);
if (ReadSession) {
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< "[STATE] read session id " << ReadSession->GetSessionId());
@@ -317,8 +318,7 @@ void TMirrorer::HandleChangeConfig(TEvPQ::TEvChangePartitionConfig::TPtr& ev, co
Config = ev->Get()->Config.GetPartitionConfig().GetMirrorFrom();
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " changing config");
- Become(&TThis::StateInitConsumer);
- ctx.Send(SelfId(), new TEvPQ::TEvInitCredentials);
+ StartInit(ctx);
}
}
@@ -361,15 +361,49 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
void TMirrorer::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*ev*/, const TActorContext& ctx) {
+ if (CredentialsRequestInFlight) {
+ LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials request already inflight.");
+ return;
+ }
LastInitStageTimestamp = ctx.Now();
- try {
- RecreateCredentialsProvider(ctx);
- } catch(...) {
- ProcessError(ctx, "cannot initialize credentials provider: " + CurrentExceptionMessage());
+ CredentialsProvider = nullptr;
+
+ auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
+ Y_VERIFY(factory);
+ auto future = factory->GetCredentialsProvider(Config.GetCredentials());
+ future.Subscribe(
+ [
+ actorSystem = ctx.ExecutorThread.ActorSystem,
+ selfId = SelfId()
+ ](const NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr>& result) {
+ THolder<TEvPQ::TEvCredentialsCreated> ev;
+ if (result.HasException()) {
+ TString error;
+ try {
+ result.TryRethrow();
+ } catch(...) {
+ error = CurrentExceptionMessage();
+ }
+ ev = MakeHolder<TEvPQ::TEvCredentialsCreated>(error);
+ } else {
+ ev = MakeHolder<TEvPQ::TEvCredentialsCreated>(result.GetValue());
+ }
+ actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev.Release()));
+ }
+ );
+ CredentialsRequestInFlight = true;
+}
+
+void TMirrorer::HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx) {
+ CredentialsRequestInFlight = false;
+ if (ev->Get()->Error) {
+ ProcessError(ctx, TStringBuilder() << "cannot initialize credentials provider: " << ev->Get()->Error.value());
ScheduleWithIncreasingTimeout<TEvPQ::TEvInitCredentials>(SelfId(), ConsumerInitInterval, CONSUMER_INIT_INTERVAL_MAX, ctx);
return;
}
- LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials provider created");
+
+ CredentialsProvider = ev->Get()->Credentials;
+ LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials provider created " << bool(CredentialsProvider));
ConsumerInitInterval = CONSUMER_INIT_INTERVAL_START;
ScheduleConsumerCreation(ctx);
}
@@ -424,8 +458,14 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont
return logPrefix + message;
});
- ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log);
-
+ try {
+ ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log);
+ } catch(...) {
+ ProcessError(ctx, TStringBuilder() << "got an exception during the creation read session: " << CurrentExceptionMessage());
+ ScheduleConsumerCreation(ctx);
+ return;
+ }
+
LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
<< " read session created: " << ReadSession->GetSessionId());
@@ -487,14 +527,6 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
ScheduleWithIncreasingTimeout<TEvPQ::TEvCreateConsumer>(SelfId(), ConsumerInitInterval, CONSUMER_INIT_INTERVAL_MAX, ctx);
}
-void TMirrorer::RecreateCredentialsProvider(const TActorContext& ctx) {
- CredentialsProvider = nullptr;
-
- auto factory = AppData(ctx)->PersQueueMirrorReaderFactory;
- Y_VERIFY(factory);
- CredentialsProvider = factory->GetCredentialsProvider(Config.GetCredentials());
-}
-
TString TMirrorer::MirrorerDescription() const {
return TStringBuilder() << "[mirrorer for " << TopicConverter->GetPrintableString() << ", partition " << Partition << ']';
}
diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h
index dae06d6d708..8c247147fa7 100644
--- a/ydb/core/persqueue/mirrorer.h
+++ b/ydb/core/persqueue/mirrorer.h
@@ -56,6 +56,7 @@ private:
TRACE_EVENT(NKikimrServices::PQ_MIRRORER);
switch (ev->GetTypeRewrite()) {
HFuncTraced(TEvPQ::TEvInitCredentials, HandleInitCredentials);
+ HFuncTraced(TEvPQ::TEvCredentialsCreated, HandleCredentialsCreated);
HFuncTraced(TEvPQ::TEvChangePartitionConfig, HandleChangeConfig);
HFuncTraced(TEvPQ::TEvCreateConsumer, CreateConsumer);
HFuncTraced(TEvPQ::TEvRetryWrite, HandleRetryWrite);
@@ -106,7 +107,6 @@ private:
const NKikimrClient::TPersQueuePartitionResponse& response
);
void ScheduleConsumerCreation(const TActorContext& ctx);
- void RecreateCredentialsProvider(const TActorContext& ctx);
void StartInit(const TActorContext& ctx);
void RetryWrite(const TActorContext& ctx);
@@ -137,6 +137,7 @@ public:
void TryToRead(const TActorContext& ctx);
void TryToWrite(const TActorContext& ctx);
void HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& ev, const TActorContext& ctx);
+ void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
void HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& ev, const TActorContext& ctx);
void HandleWakeup(const TActorContext& ctx);
void CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr& ev, const TActorContext& ctx);
@@ -164,7 +165,7 @@ private:
std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight;
TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START;
TInstant WriteRequestTimestamp;
- std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProvider;
+ NYdb::TCredentialsProviderFactoryPtr CredentialsProvider;
std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession;
ui64 ReaderGeneration = 0;
NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream;
@@ -178,6 +179,7 @@ private:
TTabletCountersBase Counters;
bool WaitNextReaderEventInFlight = false;
+ bool CredentialsRequestInFlight = false;
bool WasSuccessfulRecording = false;
TInstant LastStateLogTimestamp;