diff options
author | alexbogo <alexbogo@ydb.tech> | 2022-09-05 14:22:33 +0300 |
---|---|---|
committer | alexbogo <alexbogo@ydb.tech> | 2022-09-05 14:22:33 +0300 |
commit | 53dc0b3016008947b9f1dadf496716a5868bae7c (patch) | |
tree | bf02637986f604617b5d595010c11686204c7b02 | |
parent | 7597a329b3256b41c92d36c943cf18dd0de845ea (diff) | |
download | ydb-53dc0b3016008947b9f1dadf496716a5868bae7c.tar.gz |
[mirrorer] async creation of credential factory
init
-rw-r--r-- | ydb/core/persqueue/actor_persqueue_client_iface.h | 23 | ||||
-rw-r--r-- | ydb/core/persqueue/events/internal.h | 18 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.cpp | 68 | ||||
-rw-r--r-- | ydb/core/persqueue/mirrorer.h | 6 |
4 files changed, 90 insertions, 25 deletions
diff --git a/ydb/core/persqueue/actor_persqueue_client_iface.h b/ydb/core/persqueue/actor_persqueue_client_iface.h index 29478c4de0b..76c1a8545ac 100644 --- a/ydb/core/persqueue/actor_persqueue_client_iface.h +++ b/ydb/core/persqueue/actor_persqueue_client_iface.h @@ -34,9 +34,15 @@ public: Driver = std::make_shared<NYdb::TDriver>(driverConfig); } - virtual std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider( + NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProvider( const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred - ) const = 0; + ) const noexcept { + try { + return GetCredentialsProviderImpl(cred); + } catch(...) { + return NThreading::MakeErrorFuture<NYdb::TCredentialsProviderFactoryPtr>(std::current_exception()); + } + } virtual std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession( const NKikimrPQ::TMirrorPartitionConfig& config, @@ -53,18 +59,24 @@ public: } protected: + virtual NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProviderImpl( + const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred + ) const = 0; + + +protected: mutable TDeferredActorLogBackend::TSharedAtomicActorSystemPtr ActorSystemPtr; mutable std::shared_ptr<NYdb::TDriver> Driver; }; class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory { -public: - std::shared_ptr<NYdb::ICredentialsProviderFactory> GetCredentialsProvider( +protected: + NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr> GetCredentialsProviderImpl( const NKikimrPQ::TMirrorPartitionConfig::TCredentials& cred ) const override { switch (cred.GetCredentialsCase()) { case NKikimrPQ::TMirrorPartitionConfig::TCredentials::CREDENTIALS_NOT_SET: { - return NYdb::CreateInsecureCredentialsProviderFactory(); + return NThreading::MakeFuture(NYdb::CreateInsecureCredentialsProviderFactory()); } default: { ythrow yexception() << "unsupported credentials type " << ui64(cred.GetCredentialsCase()); @@ -72,6 +84,7 @@ public: } } +public: std::shared_ptr<NYdb::NPersQueue::IReadSession> GetReadSession( const NKikimrPQ::TMirrorPartitionConfig& config, ui32 partition, diff --git a/ydb/core/persqueue/events/internal.h b/ydb/core/persqueue/events/internal.h index c5a8d733e7e..ac4e6be45e6 100644 --- a/ydb/core/persqueue/events/internal.h +++ b/ydb/core/persqueue/events/internal.h @@ -12,6 +12,10 @@ #include <util/generic/maybe.h> +namespace NYdb { + class ICredentialsProviderFactory; +} + namespace NKikimr { namespace NPQ { @@ -109,6 +113,7 @@ struct TEvPQ { EvReadLimiterCounters, EvRetryWrite, EvInitCredentials, + EvCredentialsCreated, EvCreateConsumer, EvRequestPartitionStatus, EvReaderEventArrived, @@ -607,6 +612,19 @@ struct TEvPQ { TEvInitCredentials() {} }; + + struct TEvCredentialsCreated : public TEventLocal<TEvCredentialsCreated, EvCredentialsCreated> { + TEvCredentialsCreated(const TString& error) + : Error(error) + {} + + TEvCredentialsCreated(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentials) + : Credentials(credentials) + {} + + std::shared_ptr<NYdb::ICredentialsProviderFactory> Credentials; + std::optional<TString> Error; + }; struct TEvCreateConsumer : public TEventLocal<TEvCreateConsumer, EvCreateConsumer> { TEvCreateConsumer() diff --git a/ydb/core/persqueue/mirrorer.cpp b/ydb/core/persqueue/mirrorer.cpp index 021a48a5df9..e7cb23ce46b 100644 --- a/ydb/core/persqueue/mirrorer.cpp +++ b/ydb/core/persqueue/mirrorer.cpp @@ -242,7 +242,8 @@ void TMirrorer::Handle(TEvPQ::TEvUpdateCounters::TPtr& /*ev*/, const TActorConte LastStateLogTimestamp = ctx.Now(); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << "[STATE] current state=" << GetCurrentState() - << ", read session=" << bool(ReadSession) << ", credentials provider=" << bool(CredentialsProvider)); + << ", read session=" << bool(ReadSession) << ", credentials provider=" << bool(CredentialsProvider) + << ", credentials request inflight=" << CredentialsRequestInFlight); if (ReadSession) { LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << "[STATE] read session id " << ReadSession->GetSessionId()); @@ -317,8 +318,7 @@ void TMirrorer::HandleChangeConfig(TEvPQ::TEvChangePartitionConfig::TPtr& ev, co Config = ev->Get()->Config.GetPartitionConfig().GetMirrorFrom(); LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " changing config"); - Become(&TThis::StateInitConsumer); - ctx.Send(SelfId(), new TEvPQ::TEvInitCredentials); + StartInit(ctx); } } @@ -361,15 +361,49 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) { void TMirrorer::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*ev*/, const TActorContext& ctx) { + if (CredentialsRequestInFlight) { + LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials request already inflight."); + return; + } LastInitStageTimestamp = ctx.Now(); - try { - RecreateCredentialsProvider(ctx); - } catch(...) { - ProcessError(ctx, "cannot initialize credentials provider: " + CurrentExceptionMessage()); + CredentialsProvider = nullptr; + + auto factory = AppData(ctx)->PersQueueMirrorReaderFactory; + Y_VERIFY(factory); + auto future = factory->GetCredentialsProvider(Config.GetCredentials()); + future.Subscribe( + [ + actorSystem = ctx.ExecutorThread.ActorSystem, + selfId = SelfId() + ](const NThreading::TFuture<NYdb::TCredentialsProviderFactoryPtr>& result) { + THolder<TEvPQ::TEvCredentialsCreated> ev; + if (result.HasException()) { + TString error; + try { + result.TryRethrow(); + } catch(...) { + error = CurrentExceptionMessage(); + } + ev = MakeHolder<TEvPQ::TEvCredentialsCreated>(error); + } else { + ev = MakeHolder<TEvPQ::TEvCredentialsCreated>(result.GetValue()); + } + actorSystem->Send(new NActors::IEventHandle(selfId, selfId, ev.Release())); + } + ); + CredentialsRequestInFlight = true; +} + +void TMirrorer::HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx) { + CredentialsRequestInFlight = false; + if (ev->Get()->Error) { + ProcessError(ctx, TStringBuilder() << "cannot initialize credentials provider: " << ev->Get()->Error.value()); ScheduleWithIncreasingTimeout<TEvPQ::TEvInitCredentials>(SelfId(), ConsumerInitInterval, CONSUMER_INIT_INTERVAL_MAX, ctx); return; } - LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials provider created"); + + CredentialsProvider = ev->Get()->Credentials; + LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " credentials provider created " << bool(CredentialsProvider)); ConsumerInitInterval = CONSUMER_INIT_INTERVAL_START; ScheduleConsumerCreation(ctx); } @@ -424,8 +458,14 @@ void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorCont return logPrefix + message; }); - ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log); - + try { + ReadSession = factory->GetReadSession(Config, Partition, CredentialsProvider, MAX_BYTES_IN_FLIGHT, log); + } catch(...) { + ProcessError(ctx, TStringBuilder() << "got an exception during the creation read session: " << CurrentExceptionMessage()); + ScheduleConsumerCreation(ctx); + return; + } + LOG_NOTICE_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " read session created: " << ReadSession->GetSessionId()); @@ -487,14 +527,6 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) { ScheduleWithIncreasingTimeout<TEvPQ::TEvCreateConsumer>(SelfId(), ConsumerInitInterval, CONSUMER_INIT_INTERVAL_MAX, ctx); } -void TMirrorer::RecreateCredentialsProvider(const TActorContext& ctx) { - CredentialsProvider = nullptr; - - auto factory = AppData(ctx)->PersQueueMirrorReaderFactory; - Y_VERIFY(factory); - CredentialsProvider = factory->GetCredentialsProvider(Config.GetCredentials()); -} - TString TMirrorer::MirrorerDescription() const { return TStringBuilder() << "[mirrorer for " << TopicConverter->GetPrintableString() << ", partition " << Partition << ']'; } diff --git a/ydb/core/persqueue/mirrorer.h b/ydb/core/persqueue/mirrorer.h index dae06d6d708..8c247147fa7 100644 --- a/ydb/core/persqueue/mirrorer.h +++ b/ydb/core/persqueue/mirrorer.h @@ -56,6 +56,7 @@ private: TRACE_EVENT(NKikimrServices::PQ_MIRRORER); switch (ev->GetTypeRewrite()) { HFuncTraced(TEvPQ::TEvInitCredentials, HandleInitCredentials); + HFuncTraced(TEvPQ::TEvCredentialsCreated, HandleCredentialsCreated); HFuncTraced(TEvPQ::TEvChangePartitionConfig, HandleChangeConfig); HFuncTraced(TEvPQ::TEvCreateConsumer, CreateConsumer); HFuncTraced(TEvPQ::TEvRetryWrite, HandleRetryWrite); @@ -106,7 +107,6 @@ private: const NKikimrClient::TPersQueuePartitionResponse& response ); void ScheduleConsumerCreation(const TActorContext& ctx); - void RecreateCredentialsProvider(const TActorContext& ctx); void StartInit(const TActorContext& ctx); void RetryWrite(const TActorContext& ctx); @@ -137,6 +137,7 @@ public: void TryToRead(const TActorContext& ctx); void TryToWrite(const TActorContext& ctx); void HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& ev, const TActorContext& ctx); + void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx); void HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& ev, const TActorContext& ctx); void HandleWakeup(const TActorContext& ctx); void CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr& ev, const TActorContext& ctx); @@ -164,7 +165,7 @@ private: std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight; TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START; TInstant WriteRequestTimestamp; - std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProvider; + NYdb::TCredentialsProviderFactoryPtr CredentialsProvider; std::shared_ptr<NYdb::NPersQueue::IReadSession> ReadSession; ui64 ReaderGeneration = 0; NYdb::NPersQueue::TPartitionStream::TPtr PartitionStream; @@ -178,6 +179,7 @@ private: TTabletCountersBase Counters; bool WaitNextReaderEventInFlight = false; + bool CredentialsRequestInFlight = false; bool WasSuccessfulRecording = false; TInstant LastStateLogTimestamp; |