aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-08-31 11:17:21 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-08-31 11:50:30 +0300
commita9e6862e2058d283ab327787eb4a85533fb81fdd (patch)
tree0169acf29c33a0dabb055e79febbd67705d5edee
parent318289efe1e924fb92c6f15642ded8d5b7b0713c (diff)
downloadydb-a9e6862e2058d283ab327787eb4a85533fb81fdd.tar.gz
Added secrets snapshot waiting
Added secrets snapshot waiting timeout.
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp2
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_actor.cpp14
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp12
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.h4
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp15
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp11
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h36
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp10
-rw-r--r--ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h25
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp12
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.h6
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp32
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp10
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.h1
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp15
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.h3
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_actor.cpp12
-rw-r--r--ydb/core/kqp/session_actor/kqp_worker_common.h3
-rw-r--r--ydb/core/testlib/test_client.cpp1
20 files changed, 143 insertions, 84 deletions
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 919b2a9a6e9..89b1f8b7faa 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -2106,7 +2106,7 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
new NYql::NLog::TTlsLogBackend(new TNullLogBackend())));
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetAuthConfig().GetTokenAccessorConfig(),
- Config.GetQueryServiceConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
+ Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources));
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
index b2121dc5141..7de2211aa82 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp
@@ -57,8 +57,8 @@ public:
return NKikimrServices::TActivity::KQP_COMPILE_ACTOR;
}
- TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
- const TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway,
+ TKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& queryId,
@@ -75,6 +75,7 @@ public:
, UserToken(userToken)
, DbCounters(dbCounters)
, Config(MakeIntrusive<TKikimrConfiguration>())
+ , MetadataProviderConfig(metadataProviderConfig)
, CompilationTimeout(TDuration::MilliSeconds(serviceConfig.GetCompileTimeoutMs()))
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
@@ -117,7 +118,7 @@ public:
counters->TxProxyMon = new NTxProxy::TTxProxyMon(AppData(ctx)->Counters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader =
std::make_shared<TKqpTableMetadataLoader>(
- TlsActivationContext->ActorSystem(), Config, true, TempTablesState);
+ TlsActivationContext->ActorSystem(), Config, true, TempTablesState, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(QueryId.Cluster, QueryId.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), counters);
Gateway->SetToken(QueryId.Cluster, UserToken);
@@ -379,6 +380,7 @@ private:
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TKqpDbCountersPtr DbCounters;
TKikimrConfiguration::TPtr Config;
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TDuration CompilationTimeout;
TInstant StartTime;
TDuration CompileCpuTime;
@@ -418,14 +420,14 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.PredicateExtract20 = serviceConfig.GetPredicateExtract20();
}
-IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
- const TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway,
+IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings, const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpDbCountersPtr dbCounters, NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState)
{
- return new TKqpCompileActor(owner, kqpSettings, serviceConfig, std::move(httpGateway), moduleResolverState,
+ return new TKqpCompileActor(owner, kqpSettings, serviceConfig, metadataProviderConfig, std::move(httpGateway), moduleResolverState,
counters, std::move(credentialsFactory),
uid, query, userToken, dbCounters, std::move(traceId), std::move(tempTablesState));
}
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 3bd81e846f3..960fdc85151 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -307,12 +307,14 @@ public:
return NKikimrServices::TActivity::KQP_COMPILE_SERVICE;
}
- TKqpCompileService(const TTableServiceConfig& serviceConfig, const TKqpSettings::TConstPtr& kqpSettings,
+ TKqpCompileService(const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::IHTTPGateway::TPtr httpGateway)
: Config(serviceConfig)
+ , MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(kqpSettings)
, ModuleResolverState(moduleResolverState)
, Counters(counters)
@@ -737,7 +739,7 @@ private:
}
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
- auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, HttpGateway, ModuleResolverState, Counters,
+ auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, Config, MetadataProviderConfig, HttpGateway, ModuleResolverState, Counters,
CredentialsFactory, request.Uid, request.Query, request.UserToken, request.DbCounters, request.CompileServiceSpan.GetTraceId(), std::move(request.TempTablesState));
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
@@ -817,6 +819,7 @@ private:
private:
TTableServiceConfig Config;
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TKqpSettings::TConstPtr KqpSettings;
TIntrusivePtr<TModuleResolverState> ModuleResolverState;
TIntrusivePtr<TKqpCounters> Counters;
@@ -829,13 +832,14 @@ private:
NYql::IHTTPGateway::TPtr HttpGateway;
};
-IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig, const TKqpSettings::TConstPtr& kqpSettings,
+IActor* CreateKqpCompileService(const TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, const TKqpSettings::TConstPtr& kqpSettings,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::IHTTPGateway::TPtr httpGateway)
{
- return new TKqpCompileService(serviceConfig, kqpSettings, moduleResolverState, counters,
+ return new TKqpCompileService(serviceConfig, metadataProviderConfig, kqpSettings, moduleResolverState, counters,
std::move(queryReplayFactory), std::move(credentialsFactory), std::move(httpGateway));
}
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.h b/ydb/core/kqp/compile_service/kqp_compile_service.h
index 780ad843d18..32c28748e19 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.h
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.h
@@ -10,6 +10,7 @@ namespace NKikimr {
namespace NKqp {
IActor* CreateKqpCompileService(const NKikimrConfig::TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
const TKqpSettings::TConstPtr& kqpSettings, TIntrusivePtr<TModuleResolverState> moduleResolverState,
TIntrusivePtr<TKqpCounters> counters, std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -19,7 +20,8 @@ IActor* CreateKqpCompileComputationPatternService(const NKikimrConfig::TTableSer
TIntrusivePtr<TKqpCounters> counters);
IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstPtr& kqpSettings,
- const NKikimrConfig::TTableServiceConfig& serviceConfig, NYql::IHTTPGateway::TPtr httpGateway,
+ const NKikimrConfig::TTableServiceConfig& serviceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
const TString& uid, const TKqpQueryId& query,
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index 95314b54fd7..ff6089f90fa 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -128,8 +128,8 @@ public:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- const TActorId& creator)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, TWilsonKqp::DataExecuter, "DataExecuter")
+ const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime)
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, TWilsonKqp::DataExecuter, "DataExecuter")
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
{
@@ -295,6 +295,7 @@ public:
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData);
hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
+ hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -1655,9 +1656,11 @@ private:
}
void DoExecute() {
+ TVector<TString> secretNames;
for (const auto& transaction : Request.Transactions) {
- if (!transaction.Body->GetSecretNames().empty()) {
+ for (const auto& secretName : transaction.Body->GetSecretNames()) {
SecretSnapshotRequired = true;
+ secretNames.push_back(secretName);
}
for (const auto& stage : transaction.Body->GetStages()) {
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
@@ -1671,7 +1674,7 @@ private:
return Execute();
}
if (SecretSnapshotRequired) {
- FetchSecrets();
+ FetchSecrets(std::move(secretNames));
}
if (ResourceSnapshotRequired) {
GetResourcesSnapshot();
@@ -2412,10 +2415,10 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- const TActorId& creator)
+ const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
- std::move(asyncIoFactory), chanTransportVersion, creator);
+ std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 63c071f0650..d858cb15d32 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -90,7 +90,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
- const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator);
+ const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
+ TDuration maximalSecretsSnapshotWaitTime);
IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, const TActorId& target, const TString& database,
TIntrusiveConstPtr<NACLib::TUserToken> userToken, NKikimr::NKqp::TTxAllocatorState::TPtr txAlloc,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index 9d9af4f7730..ea5f10910a9 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -81,12 +81,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
- const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator)
+ const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
+ TDuration maximalSecretsSnapshotWaitTime)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -102,13 +103,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
- return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion);
+ return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 49810f3b8f7..734315dd1f4 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -115,7 +115,7 @@ public:
TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
- ui64 spanVerbosity = 0, TString spanName = "no_name")
+ TDuration maximalSecretsSnapshotWaitTime, ui64 spanVerbosity = 0, TString spanName = "no_name")
: Request(std::move(request))
, Database(database)
, UserToken(userToken)
@@ -123,6 +123,7 @@ public:
, ExecuterSpan(spanVerbosity, std::move(Request.TraceId), spanName)
, Planner(nullptr)
, ExecuterRetriesConfig(executerRetriesConfig)
+ , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
{
TasksGraph.GetMeta().Snapshot = IKqpGateway::TKqpSnapshot(Request.Snapshot.Step, Request.Snapshot.TxId);
TasksGraph.GetMeta().Arena = MakeIntrusive<NActors::TProtoArenaHolder>();
@@ -382,14 +383,35 @@ protected:
return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
}
- void FetchSecrets() {
+ void FetchSecrets(TVector<TString>&& secretNames) {
YQL_ENSURE(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service is not active");
+ SecretNames = std::move(secretNames);
+
this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
+ this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
}
void HandleRefreshSubscriberData(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>();
+
+ TString secretValue;
+ for (const TString& secretName : SecretNames) {
+ auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName);
+ if (!Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue)) {
+ return;
+ }
+ }
+
+ UnsubscribeFromSecrets();
+ }
+
+ void HandleSecretsWaitingTimeout(NActors::TEvents::TEvWakeup::TPtr&) {
+ YQL_ENSURE(Secrets != nullptr, "secrets snapshot fetching timeout");
+ UnsubscribeFromSecrets();
+ }
+
+ void UnsubscribeFromSecrets() {
+ this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
OnSecretsFetched();
}
@@ -1246,6 +1268,8 @@ protected:
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig ExecuterRetriesConfig;
std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
+ TVector<TString> SecretNames;
+ TDuration MaximalSecretsSnapshotWaitTime;
private:
static constexpr TDuration ResourceUsageUpdateInterval = TDuration::MilliSeconds(100);
@@ -1256,13 +1280,15 @@ private:
IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters, bool streamResult,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
- const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator);
+ const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
+ TDuration maximalSecretsSnapshotWaitTime);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion);
+ TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
+ TDuration maximalSecretsSnapshotWaitTime);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index b640dc214ae..8179d8162c9 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -61,8 +61,9 @@ public:
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
TPreparedQueryHolder::TConstPtr preparedQuery,
- const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion)
- : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, TWilsonKqp::ScanExecuter, "ScanExecuter")
+ const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
+ TDuration maximalSecretsSnapshotWaitTime)
+ : TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, maximalSecretsSnapshotWaitTime, TWilsonKqp::ScanExecuter, "ScanExecuter")
, PreparedQuery(preparedQuery)
, AggregationSettings(aggregation)
{
@@ -710,10 +711,11 @@ IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
- TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion)
+ TPreparedQueryHolder::TConstPtr preparedQuery, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
+ TDuration maximalSecretsSnapshotWaitTime)
{
return new TKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig,
- preparedQuery, chanTransportVersion);
+ preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
index 7df524a453b..5a5c75c46d4 100644
--- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
+++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
@@ -132,10 +132,10 @@ struct TDescribeSecretsResponse {
class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> {
STRICT_STFUNC(StateFunc,
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NActors::TEvents::TEvWakeup, Handle);
)
void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();
TVector<TString> secretValues;
@@ -144,13 +144,21 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets
TString secretValue;
const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue);
if (!isFound) {
- Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") }));
- PassAway();
+ LastResponse = TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") });
return;
}
secretValues.push_back(secretValue);
}
Promise.SetValue(TDescribeSecretsResponse(secretValues));
+
+ UnsubscribeFromSecrets();
+ PassAway();
+ }
+
+ void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
+ Promise.SetValue(LastResponse);
+
+ UnsubscribeFromSecrets();
PassAway();
}
@@ -158,10 +166,16 @@ class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecrets
return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
}
+ void UnsubscribeFromSecrets() {
+ this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
+ }
+
public:
- TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise)
+ TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime)
: SecretIds(CreateSecretIds(ownerUserId, secretIds))
, Promise(promise)
+ , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") })
+ , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
{}
void Bootstrap() {
@@ -172,6 +186,7 @@ public:
}
this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
+ this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
Become(&TDescribeSecretsActor::StateFunc);
}
@@ -187,6 +202,8 @@ private:
private:
const TVector<NMetadata::NSecret::TSecretId> SecretIds;
NThreading::TPromise<TDescribeSecretsResponse> Promise;
+ TDescribeSecretsResponse LastResponse;
+ TDuration MaximalSecretsSnapshotWaitTime;
};
}
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index c24d594a30d..9ba218f6269 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -418,13 +418,13 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour
}
}
-NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TActorSystem* actorSystem) {
+NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) {
const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth();
switch (authDescription.identity_case()) {
case NKikimrSchemeOp::TAuth::kServiceAccount: {
const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName();
auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise));
+ actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
@@ -434,7 +434,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues
case NKikimrSchemeOp::TAuth::kBasic: {
const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName();
auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise));
+ actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
@@ -442,7 +442,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues
const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName();
const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName();
auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise));
+ actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
@@ -450,7 +450,7 @@ NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues
const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName();
const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName();
auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise));
+ actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
@@ -730,7 +730,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
promise.SetValue(externalDataSourceMetadata);
return;
}
- LoadExternalDataSourceSecretValues(entry, userToken, ActorSystem)
+ LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem)
.Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeSecretsResponse>& result) mutable
{
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h
index 691f091989d..75a2887acb2 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.h
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h
@@ -18,11 +18,13 @@ public:
explicit TKqpTableMetadataLoader(TActorSystem* actorSystem,
NYql::TKikimrConfiguration::TPtr config,
bool needCollectSchemeData = false,
- TKqpTempTablesState::TConstPtr tempTablesState = nullptr)
+ TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
+ TDuration maximalSecretsSnapshotWaitTime = TDuration::Seconds(20))
: NeedCollectSchemeData(needCollectSchemeData)
, ActorSystem(actorSystem)
, Config(config)
, TempTablesState(std::move(tempTablesState))
+ , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
{};
NThreading::TFuture<NYql::IKikimrGateway::TTableMetadataResult> LoadTableMetadata(
@@ -61,7 +63,7 @@ private:
TActorSystem* ActorSystem;
NYql::TKikimrConfiguration::TPtr Config;
TKqpTempTablesState::TConstPtr TempTablesState;
-
+ TDuration MaximalSecretsSnapshotWaitTime;
};
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
index 750c979e07a..d53eac6e60f 100644
--- a/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_gateway_ut.cpp
@@ -302,28 +302,10 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
UNIT_ASSERT_VALUES_EQUAL(response.Metadata->Columns.size(), 2);
}
- void CreateSecretObject(const TString& secretId, const TString& secretValue, TSession& session, TTestActorRuntime* runtime) {
+ void CreateSecretObject(const TString& secretId, const TString& secretValue, TSession& session) {
auto createSecretQuery = TStringBuilder() << "CREATE OBJECT " << secretId << " (TYPE SECRET) WITH value = `" << secretValue << "`;";
auto createSecretQueryResult = session.ExecuteSchemeQuery(createSecretQuery).GetValueSync();
UNIT_ASSERT_C(createSecretQueryResult.GetStatus() == NYdb::EStatus::SUCCESS, createSecretQueryResult.GetIssues().ToString());
-
- TDuration maximalWaitTime = TDuration::Seconds(20);
- TInstant start = TInstant::Now();
- bool created = false;
- while (!created && TInstant::Now() - start <= maximalWaitTime) {
- auto promise = NThreading::NewPromise<TDescribeSecretsResponse>();
- runtime->Register(new TDescribeSecretsActor("", {secretId}, promise));
- TDescribeSecretsResponse response = promise.GetFuture().GetValueSync();
-
- if (response.Status == Ydb::StatusIds::SUCCESS) {
- created = true;
- break;
- }
-
- Sleep(TDuration::Seconds(2));
- }
-
- UNIT_ASSERT_C(created, "Creating secret object timeout.\n");
}
Y_UNIT_TEST(TestLoadServiceAccountSecretValueFromExternalDataSourceMetadata) {
@@ -334,7 +316,7 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
TString secretId = "mySaSecretId";
TString secretValue = "mySaSecretValue";
- CreateSecretObject(secretId, secretValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(secretId, secretValue, session);
TString externalDataSourceName = "/Root/ExternalDataSource";
TString externalTableName = "/Root/ExternalTable";
@@ -372,7 +354,7 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
TString secretId = "myPasswordSecretId";
TString secretValue = "pswd";
- CreateSecretObject(secretId, secretValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(secretId, secretValue, session);
TString externalDataSourceName = "/Root/ExternalDataSource";
auto query = TStringBuilder() << R"(
@@ -402,11 +384,11 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
TString secretPasswordId = "myPasswordSecretId";
TString secretPasswordValue = "pswd";
- CreateSecretObject(secretPasswordId, secretPasswordValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(secretPasswordId, secretPasswordValue, session);
TString secretSaId = "mySa";
TString secretSaValue = "sign(mySa)";
- CreateSecretObject(secretSaId, secretSaValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(secretSaId, secretSaValue, session);
TString externalDataSourceName = "/Root/ExternalDataSource";
auto query = TStringBuilder() << R"(
@@ -439,11 +421,11 @@ Y_UNIT_TEST_SUITE(KikimrIcGateway) {
TString awsAccessKeyIdSecretId = "awsAccessKeyIdSecretId";
TString awsAccessKeyIdSecretValue = "key";
- CreateSecretObject(awsAccessKeyIdSecretId, awsAccessKeyIdSecretValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(awsAccessKeyIdSecretId, awsAccessKeyIdSecretValue, session);
TString awsSecretAccessKeySecretId = "awsSecretAccessKeySecretId";
TString awsSecretAccessKeySecretValue = "value";
- CreateSecretObject(awsSecretAccessKeySecretId, awsSecretAccessKeySecretValue, session, kikimr.GetTestServer().GetRuntime());
+ CreateSecretObject(awsSecretAccessKeySecretId, awsSecretAccessKeySecretValue, session);
TString externalDataSourceName = "/Root/ExternalDataSource";
auto query = TStringBuilder() << R"(
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 9451e3193e7..b5594b83ad3 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -171,6 +171,7 @@ public:
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources>&& kqpProxySharedResources)
@@ -179,6 +180,7 @@ public:
, TableServiceConfig(tableServiceConfig)
, TokenAccessorConfig(tokenAccessorConfig)
, QueryServiceConfig(queryServiceConfig)
+ , MetadataProviderConfig(metadataProviderConfig)
, KqpSettings(std::make_shared<const TKqpSettings>(std::move(settings)))
, QueryReplayFactory(std::move(queryReplayFactory))
, HttpGateway(NYql::IHTTPGateway::Make(&HttpGatewayConfig)) // TODO: pass config and counters
@@ -238,7 +240,7 @@ public:
}
// Create compile service
- CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig,
+ CompileService = TlsActivationContext->ExecutorThread.RegisterActor(CreateKqpCompileService(TableServiceConfig, MetadataProviderConfig,
KqpSettings, ModuleResolverState, Counters, std::move(QueryReplayFactory), CredentialsFactory, HttpGateway));
TlsActivationContext->ExecutorThread.ActorSystem->RegisterLocalService(
MakeKqpCompileServiceID(SelfId().NodeId()), CompileService);
@@ -1363,7 +1365,7 @@ private:
auto config = CreateConfig(KqpSettings, workerSettings);
- IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters);
+ IActor* sessionActor = CreateKqpSessionActor(SelfId(), sessionId, KqpSettings, workerSettings, HttpGateway, AsyncIoFactory, CredentialsFactory, ModuleResolverState, Counters, MetadataProviderConfig);
auto workerId = TlsActivationContext->ExecutorThread.RegisterActor(sessionActor, TMailboxType::HTSwap, AppData()->UserPoolId);
TKqpSessionInfo* sessionInfo = LocalSessions->Create(
sessionId, workerId, database, dbCounters, supportsBalancing, GetSessionIdleDuration());
@@ -1559,6 +1561,7 @@ private:
NKikimrConfig::TTableServiceConfig TableServiceConfig;
NKikimrProto::TTokenAccessorConfig TokenAccessorConfig;
NKikimrConfig::TQueryServiceConfig QueryServiceConfig;
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TKqpSettings::TConstPtr KqpSettings;
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
std::shared_ptr<IQueryReplayBackendFactory> QueryReplayFactory;
@@ -1613,11 +1616,12 @@ IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources)
{
- return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, std::move(settings),
+ return new TKqpProxyService(logConfig, tableServiceConfig, tokenAccessorConfig, queryServiceConfig, metadataProviderConfig, std::move(settings),
std::move(queryReplayFactory),std::move(kqpProxySharedResources));
}
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.h b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
index c2ac2de2c88..648a6010e7d 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.h
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.h
@@ -51,6 +51,7 @@ IActor* CreateKqpProxyService(const NKikimrConfig::TLogConfig& logConfig,
const NKikimrConfig::TTableServiceConfig& tableServiceConfig,
const NKikimrProto::TTokenAccessorConfig& tokenAccessorConfig,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
TVector<NKikimrKqp::TKqpSetting>&& settings,
std::shared_ptr<IQueryReplayBackendFactory> queryReplayFactory,
std::shared_ptr<TKqpProxySharedResources> kqpProxySharedResources);
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index c28d4d46200..a5060c0ad79 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -152,7 +152,8 @@ public:
const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters)
+ TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
: Owner(owner)
, SessionId(sessionId)
, Counters(counters)
@@ -164,6 +165,7 @@ public:
, KqpSettings(kqpSettings)
, Config(CreateConfig(kqpSettings, workerSettings))
, Transactions(*Config->_KqpMaxActiveTxPerSession.Get(), TDuration::Seconds(*Config->_KqpTxIdleTimeoutSec.Get()))
+ , MetadataProviderConfig(metadataProviderConfig)
{
RequestCounters = MakeIntrusive<TKqpRequestCounters>();
RequestCounters->Counters = Counters;
@@ -220,7 +222,7 @@ public:
void ForwardRequest(TEvKqp::TEvQueryRequest::TPtr& ev) {
if (!WorkerId) {
std::unique_ptr<IActor> workerActor(CreateKqpWorkerActor(SelfId(), SessionId, KqpSettings, Settings,
- HttpGateway, ModuleResolverState, Counters, CredentialsFactory));
+ HttpGateway, ModuleResolverState, Counters, CredentialsFactory, MetadataProviderConfig));
WorkerId = RegisterWithSameMailbox(workerActor.release());
}
TlsActivationContext->Send(new IEventHandle(*WorkerId, SelfId(), QueryState->RequestEv.release(), ev->Flags, ev->Cookie,
@@ -1046,7 +1048,7 @@ public:
auto executerActor = CreateKqpExecuter(std::move(request), Settings.Database,
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
- AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId());
+ AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
@@ -2149,6 +2151,8 @@ private:
NTxProxy::TRequestControls RequestControls;
TKqpTempTablesState TempTablesState;
+
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
};
} // namespace
@@ -2157,9 +2161,10 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters)
+ TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
{
- return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters);
+ return new TKqpSessionActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), std::move(asyncIoFactory), std::move(credentialsFactory), std::move(moduleResolverState), counters, metadataProviderConfig);
}
}
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h
index 9015272288a..fd6f9fd381b 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.h
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.h
@@ -37,7 +37,8 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
NYql::IHTTPGateway::TPtr httpGateway, NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
- TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters);
+ TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig);
IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target);
diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
index 3fe99ef21fe..6cf0b17d17a 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp
@@ -97,7 +97,8 @@ public:
TKqpWorkerActor(const TActorId& owner, const TString& sessionId, const TKqpSettings::TConstPtr& kqpSettings,
const TKqpWorkerSettings& workerSettings, NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
: Owner(owner)
, SessionId(sessionId)
, Settings(workerSettings)
@@ -106,6 +107,7 @@ public:
, Counters(counters)
, CredentialsFactory(std::move(credentialsFactory))
, Config(MakeIntrusive<TKikimrConfiguration>())
+ , MetadataProviderConfig(metadataProviderConfig)
, CreationTime(TInstant::Now())
, QueryId(0)
, ShutdownState(std::nullopt)
@@ -134,7 +136,7 @@ public:
Counters->ReportWorkerCreated(Settings.DbCounters);
std::shared_ptr<NYql::IKikimrGateway::IKqpTableMetadataLoader> loader = std::make_shared<TKqpTableMetadataLoader>(
- TlsActivationContext->ActorSystem(), Config, false);
+ TlsActivationContext->ActorSystem(), Config, false, nullptr, 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()));
Gateway = CreateKikimrIcGateway(Settings.Cluster, Settings.Database, std::move(loader),
ctx.ExecutorThread.ActorSystem, ctx.SelfID.NodeId(), RequestCounters);
@@ -1064,6 +1066,7 @@ private:
NYql::ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
TIntrusivePtr<TKqpRequestCounters> RequestCounters;
TKikimrConfiguration::TPtr Config;
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig;
TInstant CreationTime;
TIntrusivePtr<IKqpGateway> Gateway;
TIntrusivePtr<IKqpHost> KqpHost;
@@ -1079,9 +1082,10 @@ IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
NYql::IHTTPGateway::TPtr httpGateway,
TIntrusivePtr<TModuleResolverState> moduleResolverState, TIntrusivePtr<TKqpCounters> counters,
- NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory)
+ NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig)
{
- return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory));
+ return new TKqpWorkerActor(owner, sessionId, kqpSettings, workerSettings, std::move(httpGateway), moduleResolverState, counters, std::move(credentialsFactory), metadataProviderConfig);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/session_actor/kqp_worker_common.h b/ydb/core/kqp/session_actor/kqp_worker_common.h
index 1fecdefbcc4..c7254a4fa8a 100644
--- a/ydb/core/kqp/session_actor/kqp_worker_common.h
+++ b/ydb/core/kqp/session_actor/kqp_worker_common.h
@@ -148,7 +148,8 @@ bool HasSchemeOrFatalIssues(const NYql::TIssues& issues);
IActor* CreateKqpWorkerActor(const TActorId& owner, const TString& sessionId,
const TKqpSettings::TConstPtr& kqpSettings, const TKqpWorkerSettings& workerSettings,
NYql::IHTTPGateway::TPtr httpGateway, TIntrusivePtr<TModuleResolverState> moduleResolverState,
- TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
+ TIntrusivePtr<TKqpCounters> counters, NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig);
bool IsSameProtoType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType& expected);
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 6d98f1b4343..da0510df57f 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -821,6 +821,7 @@ namespace Tests {
Settings->AppConfig.GetTableServiceConfig(),
Settings->AppConfig.GetAuthConfig().GetTokenAccessorConfig(),
Settings->AppConfig.GetQueryServiceConfig(),
+ Settings->AppConfig.GetMetadataProviderConfig(),
TVector<NKikimrKqp::TKqpSetting>(Settings->KqpSettings),
nullptr, std::move(kqpProxySharedResources));
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);