diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-31 11:17:21 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-08-31 11:50:30 +0300 |
commit | a9e6862e2058d283ab327787eb4a85533fb81fdd (patch) | |
tree | 0169acf29c33a0dabb055e79febbd67705d5edee | |
parent | 318289efe1e924fb92c6f15642ded8d5b7b0713c (diff) | |
download | ydb-a9e6862e2058d283ab327787eb4a85533fb81fdd.tar.gz |
Added secrets snapshot waiting
Added secrets snapshot waiting timeout.
20 files changed, 143 insertions, 84 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 919b2a9a6e9..89b1f8b7faa 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -2106,7 +2106,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu new NYql::NLog::TTlsLogBackend(new TNullLogBackend()))); auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(), - Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); + Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources)); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpProxyID(NodeId), TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId))); diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index b2121dc5141..7de2211aa82 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -57,8 +57,8 @@ public: return NKikimrServices::TActivity::KQP_COMPILE_ACTOR; } - TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, - const TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway, + TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& queryId, @@ -75,6 +75,7 @@ public: , UserToken(userToken) , DbCounters(dbCounters) , Config(MakeIntrusive<TKikimrConfiguration>()) + , MetadataProviderConfig(metadataProviderConfig) , CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs())) , CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor") , TempTablesState(std::move(tempTablesState)) @@ -117,7 +118,7 @@ public: counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( - TlsActivationContext->ActorSystem(), Config, true, TempTablesState); + TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters); Gateway->SetToken(QueryId.Cluster, UserToken); @@ -379,6 +380,7 @@ private: TIntrusiveConstPtr<NACLib::TUserToken> UserToken; TKqpDbCountersPtr DbCounters; TKikimrConfiguration::TPtr Config; + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; TDuration CompilationTimeout; TInstant StartTime; TDuration CompileCpuTime; @@ -418,14 +420,14 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf kqpConfig.PredicateExtract20 = serviceConfig.GetPredicateExtract20(); } -IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, - const TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway, +IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState) { - return new TKqpCompileActor(owner, kqpSettings, serviceConfig, std::move(httpGateway), moduleResolverState, + return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory), uid, query, userToken, dbCounters, std::move(traceId), std::move(tempTablesState)); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 3bd81e846f3..960fdc85151 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -307,12 +307,14 @@ public: return NKikimrServices::TActivity::KQP_COMPILE_SERVICE; } - TKqpCompileService(const TTableServiceConfig& serviceConfig, const TKqpSettings::TConstPtr& kqpSettings, + TKqpCompileService(const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::IHTTPGateway::TPtr httpGateway) : Config(serviceConfig) + , MetadataProviderConfig(metadataProviderConfig) , KqpSettings(kqpSettings) , ModuleResolverState(moduleResolverState) , Counters(counters) @@ -737,7 +739,7 @@ private: } void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) { - auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, HttpGateway, ModuleResolverState, Counters, + auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, HttpGateway, ModuleResolverState, Counters, CredentialsFactory, request.Uid, request.Query, request.UserToken, request.DbCounters, request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState)); auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap, AppData(ctx)->UserPoolId); @@ -817,6 +819,7 @@ private: private: TTableServiceConfig Config; + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; TKqpSettings::TConstPtr KqpSettings; TIntrusivePtr<TModuleResolverState> ModuleResolverState; TIntrusivePtr<TKqpCounters> Counters; @@ -829,13 +832,14 @@ private: NYql::IHTTPGateway::TPtr HttpGateway; }; -IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig, const TKqpSettings::TConstPtr& kqpSettings, +IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NYql::IHTTPGateway::TPtr httpGateway) { - return new TKqpCompileService(serviceConfig, kqpSettings, moduleResolverState, counters, + return new TKqpCompileService(serviceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters, std::move(queryReplayFactory), std::move(credentialsFactory), std::move(httpGateway)); } diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h index 780ad843d18..32c28748e19 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.h +++ b/ydb/core/kqp/compile_service/kqp_compile_service.h @@ -10,6 +10,7 @@ namespace NKikimr { namespace NKqp { IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -19,7 +20,8 @@ IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableSer TIntrusivePtr<TKqpCounters> counters); IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, - const NKikimrConfig::TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway, + const NKikimrConfig::TTableServiceConfig& serviceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const TString& uid, const TKqpQueryId& query, diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index 95314b54fd7..ff6089f90fa 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -128,8 +128,8 @@ public: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const TActorId& creator) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, TWilsonKqp::DataExecuter, "DataExecuter") + const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime) + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, TWilsonKqp::DataExecuter, "DataExecuter") , AsyncIoFactory(std::move(asyncIoFactory)) , StreamResult(streamResult) { @@ -295,6 +295,7 @@ public: hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData); hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); + hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -1655,9 +1656,11 @@ private: } void DoExecute() { + TVector<TString> secretNames; for (const auto& transaction : Request.Transactions) { - if (!transaction.Body->GetSecretNames().empty()) { + for (const auto& secretName : transaction.Body->GetSecretNames()) { SecretSnapshotRequired = true; + secretNames.push_back(secretName); } for (const auto& stage : transaction.Body->GetStages()) { if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) { @@ -1671,7 +1674,7 @@ private: return Execute(); } if (SecretSnapshotRequired) { - FetchSecrets(); + FetchSecrets(std::move(secretNames)); } if (ResourceSnapshotRequired) { GetResourcesSnapshot(); @@ -2412,10 +2415,10 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - const TActorId& creator) + const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime) { return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig, - std::move(asyncIoFactory), chanTransportVersion, creator); + std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime); } } // namespace NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h index 63c071f0650..d858cb15d32 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer.h +++ b/ydb/core/kqp/executer_actor/kqp_executer.h @@ -90,7 +90,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator); + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, + TDuration maximalSecretsSnapshotWaitTime); IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc, diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp index 9d9af4f7730..ea5f10910a9 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp @@ -81,12 +81,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator) + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, + TDuration maximalSecretsSnapshotWaitTime) { if (request.Transactions.empty()) { // commit-only or rollback-only data transaction YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback); - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime); } TMaybe<NKqpProto::TKqpPhyTx::EType> txsType; @@ -102,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt switch (*txsType) { case NKqpProto::TKqpPhyTx::TYPE_COMPUTE: case NKqpProto::TKqpPhyTx::TYPE_DATA: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime); case NKqpProto::TKqpPhyTx::TYPE_SCAN: - return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion); + return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime); case NKqpProto::TKqpPhyTx::TYPE_GENERIC: - return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator); + return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime); default: YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType); diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index 49810f3b8f7..734315dd1f4 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -115,7 +115,7 @@ public: TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, - ui64 spanVerbosity = 0, TString spanName = "no_name") + TDuration maximalSecretsSnapshotWaitTime, ui64 spanVerbosity = 0, TString spanName = "no_name") : Request(std::move(request)) , Database(database) , UserToken(userToken) @@ -123,6 +123,7 @@ public: , ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName) , Planner(nullptr) , ExecuterRetriesConfig(executerRetriesConfig) + , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) { TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId); TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>(); @@ -382,14 +383,35 @@ protected: return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>(); } - void FetchSecrets() { + void FetchSecrets(TVector<TString>&& secretNames) { YQL_ENSURE(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service is not active"); + SecretNames = std::move(secretNames); + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); + this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); } void HandleRefreshSubscriberData(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { - this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>(); + + TString secretValue; + for (const TString& secretName : SecretNames) { + auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName); + if (!Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue)) { + return; + } + } + + UnsubscribeFromSecrets(); + } + + void HandleSecretsWaitingTimeout(NActors::TEvents::TEvWakeup::TPtr&) { + YQL_ENSURE(Secrets != nullptr, "secrets snapshot fetching timeout"); + UnsubscribeFromSecrets(); + } + + void UnsubscribeFromSecrets() { + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); OnSecretsFetched(); } @@ -1246,6 +1268,8 @@ protected: const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig ExecuterRetriesConfig; std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets; + TVector<TString> SecretNames; + TDuration MaximalSecretsSnapshotWaitTime; private: static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100); @@ -1256,13 +1280,15 @@ private: IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator); + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator, + TDuration maximalSecretsSnapshotWaitTime); IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion); + TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + TDuration maximalSecretsSnapshotWaitTime); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index b640dc214ae..8179d8162c9 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -61,8 +61,9 @@ public: const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, TPreparedQueryHolder::TConstPtr preparedQuery, - const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion) - : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, TWilsonKqp::ScanExecuter, "ScanExecuter") + const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + TDuration maximalSecretsSnapshotWaitTime) + : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, TWilsonKqp::ScanExecuter, "ScanExecuter") , PreparedQuery(preparedQuery) , AggregationSettings(aggregation) { @@ -710,10 +711,11 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, - TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion) + TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, + TDuration maximalSecretsSnapshotWaitTime) { return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, - preparedQuery, chanTransportVersion); + preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime); } } // namespace NKqp diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h index 7df524a453b..5a5c75c46d4 100644 --- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h +++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h @@ -132,10 +132,10 @@ struct TDescribeSecretsResponse { class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> { STRICT_STFUNC(StateFunc, hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); + hFunc(NActors::TEvents::TEvWakeup, Handle); ) void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { - Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); TVector<TString> secretValues; @@ -144,13 +144,21 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets TString secretValue; const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue); if (!isFound) { - Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") })); - PassAway(); + LastResponse = TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") }); return; } secretValues.push_back(secretValue); } Promise.SetValue(TDescribeSecretsResponse(secretValues)); + + UnsubscribeFromSecrets(); + PassAway(); + } + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + Promise.SetValue(LastResponse); + + UnsubscribeFromSecrets(); PassAway(); } @@ -158,10 +166,16 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>(); } + void UnsubscribeFromSecrets() { + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); + } + public: - TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise) + TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime) : SecretIds(CreateSecretIds(ownerUserId, secretIds)) , Promise(promise) + , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") }) + , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) {} void Bootstrap() { @@ -172,6 +186,7 @@ public: } this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); + this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); Become(&TDescribeSecretsActor::StateFunc); } @@ -187,6 +202,8 @@ private: private: const TVector<NMetadata::NSecret::TSecretId> SecretIds; NThreading::TPromise<TDescribeSecretsResponse> Promise; + TDescribeSecretsResponse LastResponse; + TDuration MaximalSecretsSnapshotWaitTime; }; } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index c24d594a30d..9ba218f6269 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -418,13 +418,13 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour } } -NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorSystem* actorSystem) { +NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) { const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth(); switch (authDescription.identity_case()) { case NKikimrSchemeOp::TAuth::kServiceAccount: { const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName(); auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise)); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } @@ -434,7 +434,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues case NKikimrSchemeOp::TAuth::kBasic: { const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName(); auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise)); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } @@ -442,7 +442,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName(); const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName(); auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise)); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } @@ -450,7 +450,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName(); const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName(); auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise)); + actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } @@ -730,7 +730,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta promise.SetValue(externalDataSourceMetadata); return; } - LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem) + LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem) .Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeSecretsResponse>& result) mutable { UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h index 691f091989d..75a2887acb2 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.h +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h @@ -18,11 +18,13 @@ public: explicit TKqpTableMetadataLoader(TActorSystem* actorSystem, NYql::TKikimrConfiguration::TPtr config, bool needCollectSchemeData = false, - TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + TKqpTempTablesState::TConstPtr tempTablesState = nullptr, + TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20)) : NeedCollectSchemeData(needCollectSchemeData) , ActorSystem(actorSystem) , Config(config) , TempTablesState(std::move(tempTablesState)) + , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) {}; NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata( @@ -61,7 +63,7 @@ private: TActorSystem* ActorSystem; NYql::TKikimrConfiguration::TPtr Config; TKqpTempTablesState::TConstPtr TempTablesState; - + TDuration MaximalSecretsSnapshotWaitTime; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp index 750c979e07a..d53eac6e60f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp @@ -302,28 +302,10 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { UNIT_ASSERT_VALUES_EQUAL(response.Metadata->Columns.size(), 2); } - void CreateSecretObject(const TString& secretId, const TString& secretValue, TSession& session, TTestActorRuntime* runtime) { + void CreateSecretObject(const TString& secretId, const TString& secretValue, TSession& session) { auto createSecretQuery = TStringBuilder() << "CREATE OBJECT " << secretId << " (TYPE SECRET) WITH value = `" << secretValue << "`;"; auto createSecretQueryResult = session.ExecuteSchemeQuery(createSecretQuery).GetValueSync(); UNIT_ASSERT_C(createSecretQueryResult.GetStatus() == NYdb::EStatus::SUCCESS, createSecretQueryResult.GetIssues().ToString()); - - TDuration maximalWaitTime = TDuration::Seconds(20); - TInstant start = TInstant::Now(); - bool created = false; - while (!created && TInstant::Now() - start <= maximalWaitTime) { - auto promise = NThreading::NewPromise<TDescribeSecretsResponse>(); - runtime->Register(new TDescribeSecretsActor("", {secretId}, promise)); - TDescribeSecretsResponse response = promise.GetFuture().GetValueSync(); - - if (response.Status == Ydb::StatusIds::SUCCESS) { - created = true; - break; - } - - Sleep(TDuration::Seconds(2)); - } - - UNIT_ASSERT_C(created, "Creating secret object timeout.\n"); } Y_UNIT_TEST(TestLoadServiceAccountSecretValueFromExternalDataSourceMetadata) { @@ -334,7 +316,7 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TString secretId = "mySaSecretId"; TString secretValue = "mySaSecretValue"; - CreateSecretObject(secretId, secretValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(secretId, secretValue, session); TString externalDataSourceName = "/Root/ExternalDataSource"; TString externalTableName = "/Root/ExternalTable"; @@ -372,7 +354,7 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TString secretId = "myPasswordSecretId"; TString secretValue = "pswd"; - CreateSecretObject(secretId, secretValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(secretId, secretValue, session); TString externalDataSourceName = "/Root/ExternalDataSource"; auto query = TStringBuilder() << R"( @@ -402,11 +384,11 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TString secretPasswordId = "myPasswordSecretId"; TString secretPasswordValue = "pswd"; - CreateSecretObject(secretPasswordId, secretPasswordValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(secretPasswordId, secretPasswordValue, session); TString secretSaId = "mySa"; TString secretSaValue = "sign(mySa)"; - CreateSecretObject(secretSaId, secretSaValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(secretSaId, secretSaValue, session); TString externalDataSourceName = "/Root/ExternalDataSource"; auto query = TStringBuilder() << R"( @@ -439,11 +421,11 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) { TString awsAccessKeyIdSecretId = "awsAccessKeyIdSecretId"; TString awsAccessKeyIdSecretValue = "key"; - CreateSecretObject(awsAccessKeyIdSecretId, awsAccessKeyIdSecretValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(awsAccessKeyIdSecretId, awsAccessKeyIdSecretValue, session); TString awsSecretAccessKeySecretId = "awsSecretAccessKeySecretId"; TString awsSecretAccessKeySecretValue = "value"; - CreateSecretObject(awsSecretAccessKeySecretId, awsSecretAccessKeySecretValue, session, kikimr.GetTestServer().GetRuntime()); + CreateSecretObject(awsSecretAccessKeySecretId, awsSecretAccessKeySecretValue, session); TString externalDataSourceName = "/Root/ExternalDataSource"; auto query = TStringBuilder() << R"( diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 9451e3193e7..b5594b83ad3 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -171,6 +171,7 @@ public: const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources) @@ -179,6 +180,7 @@ public: , TableServiceConfig(tableServiceConfig) , TokenAccessorConfig(tokenAccessorConfig) , QueryServiceConfig(queryServiceConfig) + , MetadataProviderConfig(metadataProviderConfig) , KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings))) , QueryReplayFactory(std::move(queryReplayFactory)) , HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters @@ -238,7 +240,7 @@ public: } // Create compile service - CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, + CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, MetadataProviderConfig, KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), CredentialsFactory, HttpGateway)); TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService( MakeKqpCompileServiceID(SelfId().NodeId()), CompileService); @@ -1363,7 +1365,7 @@ private: auto config = CreateConfig(KqpSettings, workerSettings); - IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters); + IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters, MetadataProviderConfig); auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId); TKqpSessionInfo* sessionInfo = LocalSessions->Create( sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration()); @@ -1559,6 +1561,7 @@ private: NKikimrConfig::TTableServiceConfig TableServiceConfig; NKikimrProto::TTokenAccessorConfig TokenAccessorConfig; NKikimrConfig::TQueryServiceConfig QueryServiceConfig; + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; TKqpSettings::TConstPtr KqpSettings; NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory; @@ -1613,11 +1616,12 @@ IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources) { - return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, std::move(settings), + return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, metadataProviderConfig, std::move(settings), std::move(queryReplayFactory),std::move(kqpProxySharedResources)); } diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h index c2ac2de2c88..648a6010e7d 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h @@ -51,6 +51,7 @@ IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig, const NKikimrConfig::TTableServiceConfig& tableServiceConfig, const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, TVector<NKikimrKqp::TKqpSetting>&& settings, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory, std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index c28d4d46200..a5060c0ad79 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -152,7 +152,8 @@ public: const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters) + TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) : Owner(owner) , SessionId(sessionId) , Counters(counters) @@ -164,6 +165,7 @@ public: , KqpSettings(kqpSettings) , Config(CreateConfig(kqpSettings, workerSettings)) , Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get())) + , MetadataProviderConfig(metadataProviderConfig) { RequestCounters = MakeIntrusive<TKqpRequestCounters>(); RequestCounters->Counters = Counters; @@ -220,7 +222,7 @@ public: void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) { if (!WorkerId) { std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings, - HttpGateway, ModuleResolverState, Counters, CredentialsFactory)); + HttpGateway, ModuleResolverState, Counters, CredentialsFactory, MetadataProviderConfig)); WorkerId = RegisterWithSameMailbox(workerActor.release()); } TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie, @@ -1046,7 +1048,7 @@ public: auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database, QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(), RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(), - AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId()); + AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); auto exId = RegisterWithSameMailbox(executerActor); LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback); @@ -2149,6 +2151,8 @@ private: NTxProxy::TRequestControls RequestControls; TKqpTempTablesState TempTablesState; + + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; }; } // namespace @@ -2157,9 +2161,10 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters) + TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) { - return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters); + return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters, metadataProviderConfig); } } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 9015272288a..fd6f9fd381b 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -37,7 +37,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, - TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters); + TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig); IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target); diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 3fe99ef21fe..6cf0b17d17a 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -97,7 +97,8 @@ public: TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) + NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) : Owner(owner) , SessionId(sessionId) , Settings(workerSettings) @@ -106,6 +107,7 @@ public: , Counters(counters) , CredentialsFactory(std::move(credentialsFactory)) , Config(MakeIntrusive<TKikimrConfiguration>()) + , MetadataProviderConfig(metadataProviderConfig) , CreationTime(TInstant::Now()) , QueryId(0) , ShutdownState(std::nullopt) @@ -134,7 +136,7 @@ public: Counters->ReportWorkerCreated(Settings.DbCounters); std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>( - TlsActivationContext->ActorSystem(), Config, false); + TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds())); Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader), ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters); @@ -1064,6 +1066,7 @@ private: NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory; TIntrusivePtr<TKqpRequestCounters> RequestCounters; TKikimrConfiguration::TPtr Config; + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig; TInstant CreationTime; TIntrusivePtr<IKqpGateway> Gateway; TIntrusivePtr<IKqpHost> KqpHost; @@ -1079,9 +1082,10 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters, - NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) + NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig) { - return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory)); + return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory), metadataProviderConfig); } } // namespace NKqp diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h index 1fecdefbcc4..c7254a4fa8a 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_common.h +++ b/ydb/core/kqp/session_actor/kqp_worker_common.h @@ -148,7 +148,8 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues); IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState, - TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); + TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig); bool IsSameProtoType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType& expected); diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 6d98f1b4343..da0510df57f 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -821,6 +821,7 @@ namespace Tests { Settings->AppConfig.GetTableServiceConfig(), Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(), Settings->AppConfig.GetQueryServiceConfig(), + Settings->AppConfig.GetMetadataProviderConfig(), TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings), nullptr, std::move(kqpProxySharedResources)); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); |