summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <[email protected]>2022-03-05 14:12:28 +0300
committerhor911 <[email protected]>2022-03-05 14:12:28 +0300
commit713a14499114872162c8cea0d6982ec210ae287b (patch)
treecb8d2b7af29c8a05a6a8da173f62b9ab3eabe508
parent88843c83101102b763204cf53692cb529ffd743a (diff)
Multi tenant YQ
ref:33b90390167448d09d5c0a30911b97a9e98b1d2a
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp17
-rw-r--r--ydb/core/yq/libs/actors/pinger.cpp6
-rw-r--r--ydb/core/yq/libs/actors/proxy.h4
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp1
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp4
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h4
-rw-r--r--ydb/core/yq/libs/actors/task_get.cpp4
-rw-r--r--ydb/core/yq/libs/actors/task_ping.cpp4
-rw-r--r--ydb/core/yq/libs/config/protos/control_plane_storage.proto7
-rw-r--r--ydb/core/yq/libs/control_plane_storage/control_plane_storage.h3
-rw-r--r--ydb/core/yq/libs/control_plane_storage/events/events.h11
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp12
-rw-r--r--ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp14
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h28
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp9
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h7
-rw-r--r--ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp58
-rw-r--r--ydb/core/yq/libs/init/init.cpp6
-rw-r--r--ydb/public/api/protos/draft/yq_private.proto3
19 files changed, 157 insertions, 45 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index fa9afa27854..e1d10e47107 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -140,7 +140,8 @@ public:
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr s3Gateway,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const NMonitoring::TDynamicCounterPtr& clientCounters
+ const NMonitoring::TDynamicCounterPtr& clientCounters,
+ const TString& tenantName
)
: YqSharedResources(yqSharedResources)
, CommonConfig(commonConfig)
@@ -165,6 +166,7 @@ public:
.EnableSsl(PrivateApiConfig.GetSecureTaskService())
.Database(PrivateApiConfig.GetTaskServiceDatabase() ? PrivateApiConfig.GetTaskServiceDatabase() : TMaybe<TString>()),
ClientCounters)
+ , TenantName(tenantName)
{
Y_ENSURE(GetYqlDefaultModuleResolverWithContext(ModuleResolver));
}
@@ -260,10 +262,11 @@ private:
}
void GetPendingTask() {
- LOG_D("Request Private::GetTask" << ", Owner: " << Guid << ", Host: " << HostName());
+ LOG_D("Request Private::GetTask" << ", Owner: " << Guid << ", Host: " << HostName() << ", Tenant: " << TenantName);
Yq::Private::GetTaskRequest request;
request.set_owner_id(Guid);
request.set_host(HostName());
+ request.set_tenant(TenantName);
const auto actorSystem = NActors::TActivationContext::ActorSystem();
const auto selfId = SelfId();
Client
@@ -361,7 +364,8 @@ private:
task.query_name(),
NProtoInterop::CastFromProto(task.deadline()),
ClientCounters,
- createdAt);
+ createdAt,
+ TenantName);
auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
@@ -418,6 +422,7 @@ private:
TMap<TString, TQueryCountersInfo> CountersMap;
TMap<TActorId, TString> RunActorMap;
+ TString TenantName;
};
@@ -436,7 +441,8 @@ NActors::IActor* CreatePendingFetcher(
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
IHTTPGateway::TPtr s3Gateway,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const NMonitoring::TDynamicCounterPtr& clientCounters)
+ const NMonitoring::TDynamicCounterPtr& clientCounters,
+ const TString& tenantName)
{
return new TPendingFetcher(
yqSharedResources,
@@ -453,7 +459,8 @@ NActors::IActor* CreatePendingFetcher(
credentialsFactory,
s3Gateway,
std::move(pqCmConnections),
- clientCounters);
+ clientCounters,
+ tenantName);
}
TActorId MakePendingFetcherId(ui32 nodeId) {
diff --git a/ydb/core/yq/libs/actors/pinger.cpp b/ydb/core/yq/libs/actors/pinger.cpp
index 30d4beecf44..0d024cdfd7b 100644
--- a/ydb/core/yq/libs/actors/pinger.cpp
+++ b/ydb/core/yq/libs/actors/pinger.cpp
@@ -142,6 +142,7 @@ class TPingerActor : public NActors::TActorBootstrapped<TPingerActor> {
public:
TPingerActor(
+ const TString& tenantName,
const TScope& scope,
const TString& userId,
const TString& id,
@@ -153,6 +154,7 @@ public:
const ::NYq::NCommon::TServiceCounters& queryCounters,
TInstant createdAt)
: Config(config)
+ , TenantName(tenantName)
, Scope(scope)
, UserId(userId)
, Id(id)
@@ -384,6 +386,7 @@ private:
void Ping(Yq::Private::PingTaskRequest request, ui64 cookie) {
QueryCounters.SetUptimePublicAndServiceCounter((TInstant::Now() - CreatedAt).Seconds());
// Fill ids
+ request.set_tenant(TenantName);
request.set_scope(Scope.ToString());
request.set_owner_id(OwnerId);
request.mutable_query_id()->set_value(Id);
@@ -419,6 +422,7 @@ private:
TConfig Config;
+ const TString TenantName;
const TScope Scope;
const TString UserId;
const TString Id;
@@ -442,6 +446,7 @@ private:
};
IActor* CreatePingerActor(
+ const TString& tenantName,
const TScope& scope,
const TString& userId,
const TString& id,
@@ -454,6 +459,7 @@ IActor* CreatePingerActor(
TInstant createdAt)
{
return new TPingerActor(
+ tenantName,
scope,
userId,
id,
diff --git a/ydb/core/yq/libs/actors/proxy.h b/ydb/core/yq/libs/actors/proxy.h
index 1d18b025481..f355e42c241 100644
--- a/ydb/core/yq/libs/actors/proxy.h
+++ b/ydb/core/yq/libs/actors/proxy.h
@@ -48,7 +48,8 @@ NActors::IActor* CreatePendingFetcher(
NYql::ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
NYql::IHTTPGateway::TPtr s3Gateway,
::NPq::NConfigurationManager::IConnections::TPtr pqCmConnections,
- const NMonitoring::TDynamicCounterPtr& clientCounters
+ const NMonitoring::TDynamicCounterPtr& clientCounters,
+ const TString& tenantName
);
NActors::IActor* CreateRunActor(
@@ -77,6 +78,7 @@ NActors::IActor* CreateResultWriter(
);
NActors::IActor* CreatePingerActor(
+ const TString& tenantName,
const TScope& scope,
const TString& userId,
const TString& id,
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index cfe604b7a07..542cc443a5d 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -107,6 +107,7 @@ public:
LogReceivedParams();
Pinger = Register(
CreatePingerActor(
+ Params.TenantName,
Params.Scope,
Params.UserId,
Params.QueryId,
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index e7477ebdc73..14933dd6e7c 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -45,7 +45,8 @@ TRunActorParams::TRunActorParams(
const TString& queryName,
const TInstant& deadline,
const NMonitoring::TDynamicCounterPtr& clientCounters,
- TInstant createdAt
+ TInstant createdAt,
+ const TString& tenantName
)
: Driver(driver)
, S3Gateway(s3Gateway)
@@ -88,6 +89,7 @@ TRunActorParams::TRunActorParams(
, Deadline(deadline)
, ClientCounters(clientCounters)
, CreatedAt(createdAt)
+ , TenantName(tenantName)
{
}
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index f523eea7480..fd907234f35 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -59,7 +59,8 @@ struct TRunActorParams { // TODO2 : Change name
const TString& queryName,
const TInstant& deadline,
const NMonitoring::TDynamicCounterPtr& clientCounters,
- TInstant createdAt
+ TInstant createdAt,
+ const TString& tenantName
);
TRunActorParams(const TRunActorParams& params) = default;
@@ -109,6 +110,7 @@ struct TRunActorParams { // TODO2 : Change name
const NMonitoring::TDynamicCounterPtr ClientCounters;
const TInstant CreatedAt;
+ const TString TenantName;
};
} /* NYq */
diff --git a/ydb/core/yq/libs/actors/task_get.cpp b/ydb/core/yq/libs/actors/task_get.cpp
index d31517aa9fe..2e01366ed97 100644
--- a/ydb/core/yq/libs/actors/task_get.cpp
+++ b/ydb/core/yq/libs/actors/task_get.cpp
@@ -86,10 +86,11 @@ public:
const auto& req = Ev->Record;
OwnerId = req.owner_id();
Host = req.host();
+ Tenant = req.tenant();
LOG_D("Request CP::GetTask with size: " << req.ByteSize() << " bytes");
RequestedMBytes->Collect(req.ByteSize() / 1024 / 1024);
ctx.Send(NYq::ControlPlaneStorageServiceActorId(),
- new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(OwnerId, Host));
+ new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(OwnerId, Host, Tenant));
}
static TString GetServiceAccountId(const YandexQuery::IamAuth& auth) {
@@ -217,6 +218,7 @@ private:
NYql::TIssues Issues;
TString OwnerId;
TString Host;
+ TString Tenant;
THolder<TEvents::TEvGetTaskResponse> Res = MakeHolder<TEvents::TEvGetTaskResponse>();
};
diff --git a/ydb/core/yq/libs/actors/task_ping.cpp b/ydb/core/yq/libs/actors/task_ping.cpp
index 29410dccd14..a035db97a87 100644
--- a/ydb/core/yq/libs/actors/task_ping.cpp
+++ b/ydb/core/yq/libs/actors/task_ping.cpp
@@ -78,6 +78,7 @@ public:
const auto& req = Ev->Record;
OperationId = req.query_id().value();
OwnerId = req.owner_id();
+ TenantName = req.tenant();
Scope = req.scope();
Deadline = NProtoInterop::CastFromProto(req.deadline());
LOG_D("Request CP::PingTask with size: " << req.ByteSize() << " bytes");
@@ -100,7 +101,7 @@ private:
)
std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> CreateControlPlaneEvent() {
- auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(Scope, OperationId, OwnerId, Deadline);
+ auto event = std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, Scope, OperationId, OwnerId, Deadline);
const auto& req = Ev->Record;
ui64 issuesByteSize = 0;
ui64 transientIssuesByteSize = 0;
@@ -224,6 +225,7 @@ private:
TString OperationId;
TString OwnerId;
+ TString TenantName;
TString Scope;
TInstant Deadline;
NYql::TIssues Issues;
diff --git a/ydb/core/yq/libs/config/protos/control_plane_storage.proto b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
index ebced909c56..086d980ccf1 100644
--- a/ydb/core/yq/libs/config/protos/control_plane_storage.proto
+++ b/ydb/core/yq/libs/config/protos/control_plane_storage.proto
@@ -8,6 +8,12 @@ import "ydb/core/yq/libs/config/protos/storage.proto";
////////////////////////////////////////////////////////////
+message TQueryMapping {
+ map<string, string> CloudIdToTenantName = 1;
+ map<string, string> ScopeToTenantName = 2;
+ repeated string CommonTenantName = 3;
+}
+
message TControlPlaneStorageConfig {
bool Enabled = 1;
NYq.NConfig.TYdbStorageConfig Storage = 2; // TODO: remove
@@ -33,4 +39,5 @@ message TControlPlaneStorageConfig {
repeated string AvailableConnection = 22;
repeated string AvailableBinding = 23;
string ResultSetsTtl = 24;
+ TQueryMapping Mapping = 25;
}
diff --git a/ydb/core/yq/libs/control_plane_storage/control_plane_storage.h b/ydb/core/yq/libs/control_plane_storage/control_plane_storage.h
index 5180aedeea7..d91a661c6a9 100644
--- a/ydb/core/yq/libs/control_plane_storage/control_plane_storage.h
+++ b/ydb/core/yq/libs/control_plane_storage/control_plane_storage.h
@@ -43,6 +43,7 @@ NActors::IActor* CreateYdbControlPlaneStorageServiceActor(
const NConfig::TCommonConfig& common,
const NMonitoring::TDynamicCounterPtr& counters,
const NYq::TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TString& tenantName);
} // namespace NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h
index 8c98e790d72..09a73b2bf17 100644
--- a/ydb/core/yq/libs/control_plane_storage/events/events.h
+++ b/ydb/core/yq/libs/control_plane_storage/events/events.h
@@ -973,14 +973,17 @@ struct TEvControlPlaneStorage {
struct TEvGetTaskRequest : NActors::TEventLocal<TEvGetTaskRequest, EvGetTaskRequest> {
explicit TEvGetTaskRequest(
const TString& owner,
- const TString& hostName)
+ const TString& hostName,
+ const TString& tenantName)
: Owner(owner)
, HostName(hostName)
+ , TenantName(tenantName)
{
}
TString Owner;
TString HostName;
+ TString TenantName;
};
struct TTask {
@@ -1023,8 +1026,9 @@ struct TEvControlPlaneStorage {
};
struct TEvPingTaskRequest : NActors::TEventLocal<TEvPingTaskRequest, EvPingTaskRequest> {
- explicit TEvPingTaskRequest(const TString& scope, const TString& queryId, const TString& owner, const TInstant& deadline, const TString& resultId = "")
- : Scope(scope)
+ explicit TEvPingTaskRequest(const TString& tenantName, const TString& scope, const TString& queryId, const TString& owner, const TInstant& deadline, const TString& resultId = "")
+ : TenantName(tenantName)
+ , Scope(scope)
, QueryId(queryId)
, Owner(owner)
, Deadline(deadline)
@@ -1032,6 +1036,7 @@ struct TEvControlPlaneStorage {
{
}
+ const TString TenantName;
const TString Scope;
const TString QueryId;
const TString Owner;
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
index 42b3c1a1acb..c5fe378e580 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp
@@ -21,6 +21,7 @@ struct TTaskInternal {
TString HostName;
TMaybe<YandexQuery::Job> Job;
TInstant Deadline;
+ TString TenantName;
};
std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal) {
@@ -31,6 +32,7 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal) {
const auto& owner = taskInternal.Owner;
TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(write)");
+ queryBuilder.AddString("tenant", taskInternal.TenantName);
queryBuilder.AddString("scope", task.Scope);
queryBuilder.AddString("query_id", task.QueryId);
queryBuilder.AddString("query", task.Query.SerializeAsString());
@@ -53,7 +55,7 @@ std::pair<TString, NYdb::TParams> MakeSql(const TTaskInternal& taskInternal) {
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now,\n"
"`" RETRY_COUNTER_COLUMN_NAME "` = $retry_counter, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "` = $retry_counter_update_time, `" IS_RESIGN_QUERY_COLUMN_NAME "` = false,\n"
"`" HOST_NAME_COLUMN_NAME "` = $host, `" OWNER_COLUMN_NAME "` = $owner\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
const auto query = queryBuilder.Build();
@@ -73,6 +75,7 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
const auto& task = taskInternal.Task;
TSqlQueryBuilder queryBuilder(taskInternal.TablePathPrefix, "GetTask(read)");
+ queryBuilder.AddString("tenant", taskInternal.TenantName);
queryBuilder.AddString("scope", task.Scope);
queryBuilder.AddString("query_id", task.QueryId);
queryBuilder.AddTimestamp("from", taskLeaseTimestamp);
@@ -83,7 +86,7 @@ std::tuple<TString, NYdb::TParams, std::function<std::pair<TString, NYdb::TParam
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
"SELECT `" LAST_SEEN_AT_COLUMN_NAME "`\n"
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" LAST_SEEN_AT_COLUMN_NAME "` < $from;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" LAST_SEEN_AT_COLUMN_NAME "` < $from;\n"
);
auto prepareParams = [=, taskInternal=taskInternal, responseTasks=responseTasks](const TVector<TResultSet>& resultSets) mutable {
@@ -141,6 +144,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
TEvControlPlaneStorage::TEvGetTaskRequest& request = *ev->Get();
const TString owner = request.Owner;
const TString hostName = request.HostName;
+ const TString tenantName = request.TenantName;
const ui64 tasksBatchSize = Config.Proto.GetTasksBatchSize();
const ui64 numTasksProportion = Config.Proto.GetNumTasksProportion();
@@ -161,13 +165,14 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
TSqlQueryBuilder queryBuilder(YdbConnection->TablePathPrefix, "GetTask(read stale ro)");
const auto taskLeaseTimestamp = TInstant::Now() - Config.TaskLeaseTtl;
+ queryBuilder.AddString("tenant", tenantName);
queryBuilder.AddTimestamp("from", taskLeaseTimestamp);
queryBuilder.AddUint64("tasks_limit", tasksBatchSize);
queryBuilder.AddText(
"SELECT `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`,\n"
"`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" IS_RESIGN_QUERY_COLUMN_NAME "`\n"
"FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- "WHERE `" LAST_SEEN_AT_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" LAST_SEEN_AT_COLUMN_NAME "` < $from ORDER BY `" QUERY_ID_COLUMN_NAME "` DESC LIMIT $tasks_limit;\n"
);
auto responseTasks = std::make_shared<TResponseTasks>();
@@ -184,6 +189,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ
TTaskInternal& taskInternal = tasks.emplace_back();
taskInternal.Owner = owner;
taskInternal.HostName = hostName;
+ taskInternal.TenantName = tenantName;
taskInternal.TablePathPrefix = YdbConnection->TablePathPrefix;
auto& task = taskInternal.Task;
diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
index 834d40c0f72..50ef9779627 100644
--- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp
@@ -22,6 +22,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
const TString& tablePathPrefix, const TDuration& automaticQueriesTtl) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)");
+ readQueryBuilder.AddString("tenant", request->TenantName);
readQueryBuilder.AddString("scope", request->Scope);
readQueryBuilder.AddString("query_id", request->QueryId);
readQueryBuilder.AddText(
@@ -32,7 +33,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
"SELECT `" JOB_ID_COLUMN_NAME "`, `" JOB_COLUMN_NAME "` FROM `" JOBS_TABLE_NAME "`\n"
" WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" JOB_ID_COLUMN_NAME "` = $last_job_id;\n"
"SELECT `" OWNER_COLUMN_NAME "` FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- " WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ " WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
auto prepareParams = [=](const TVector<TResultSet>& resultSets) {
@@ -204,6 +205,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
}
TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "HardPingTask(write)");
+ writeQueryBuilder.AddString("tenant", request->TenantName);
writeQueryBuilder.AddString("scope", request->Scope);
writeQueryBuilder.AddString("job_id", jobId);
writeQueryBuilder.AddString("job", job.SerializeAsString());
@@ -217,7 +219,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
// delete pending
writeQueryBuilder.AddText(
"DELETE FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
} else {
// update pending small
@@ -225,7 +227,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : "";
writeQueryBuilder.AddText(
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now " + updateResignQueryFlag + "\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
}
@@ -277,13 +279,14 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
const TEvControlPlaneStorage::TEvPingTaskRequest* request, std::shared_ptr<YandexQuery::QueryAction> response,
const TString& tablePathPrefix) {
TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)");
+ readQueryBuilder.AddString("tenant", request->TenantName);
readQueryBuilder.AddString("scope", request->Scope);
readQueryBuilder.AddString("query_id", request->QueryId);
readQueryBuilder.AddText(
"SELECT `" INTERNAL_COLUMN_NAME "`\n"
"FROM `" QUERIES_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n"
"SELECT `" OWNER_COLUMN_NAME "`\n"
- "FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" QUERY_ID_COLUMN_NAME "` = $query_id AND `" SCOPE_COLUMN_NAME "` = $scope;\n"
+ "FROM `" PENDING_SMALL_TABLE_NAME "` WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
auto prepareParams = [=](const TVector<TResultSet>& resultSets) {
@@ -317,6 +320,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
TSqlQueryBuilder writeQueryBuilder(tablePathPrefix, "SoftPingTask(write)");
writeQueryBuilder.AddTimestamp("now", request->ResignQuery ? TInstant::Zero() : TInstant::Now());
+ writeQueryBuilder.AddString("tenant", request->TenantName);
writeQueryBuilder.AddString("scope", request->Scope);
writeQueryBuilder.AddString("query_id", request->QueryId);
writeQueryBuilder.AddString("owner", request->Owner);
@@ -324,7 +328,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam
const TString updateResignQueryFlag = request->ResignQuery ? ", `" IS_RESIGN_QUERY_COLUMN_NAME "` = true" : "";
writeQueryBuilder.AddText(
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" LAST_SEEN_AT_COLUMN_NAME "` = $now " + updateResignQueryFlag + "\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
);
const auto writeQuery = writeQueryBuilder.Build();
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index 75ee6f1f9fa..874b8347b8c 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -962,12 +962,14 @@ public:
class TGetTaskBuilder {
TString Owner;
TString HostName;
+ TString TenantName;
public:
TGetTaskBuilder()
{
SetOwner("owner");
SetHostName("localhost");
+ SetTenantName("/root/tenant");
}
TGetTaskBuilder& SetOwner(const TString& owner)
@@ -982,13 +984,20 @@ public:
return *this;
}
+ TGetTaskBuilder& SetTenantName(const TString& tenantName)
+ {
+ TenantName = tenantName;
+ return *this;
+ }
+
std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build()
{
- return std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(Owner, HostName);
+ return std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(Owner, HostName, TenantName);
}
};
class TPingTaskBuilder {
+ TString TenantName;
TString Scope;
TString QueryId;
TString ResultId;
@@ -1012,6 +1021,13 @@ public:
TPingTaskBuilder()
{
SetDeadline(TInstant::Now() + TDuration::Minutes(5));
+ SetTenantName("/root/tenant");
+ }
+
+ TPingTaskBuilder& SetTenantName(const TString& tenantName)
+ {
+ TenantName = tenantName;
+ return *this;
}
TPingTaskBuilder& SetScope(const TString& scope)
@@ -1124,7 +1140,7 @@ public:
std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build()
{
- auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(Scope, QueryId, Owner, Deadline, ResultId);
+ auto request = std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(TenantName, Scope, QueryId, Owner, Deadline, ResultId);
request->Status = Status;
request->Issues = Issues;
request->TransientIssues = TransientIssues;
@@ -1143,7 +1159,7 @@ public:
};
class TNodesHealthCheckBuilder {
- TString Tenant;
+ TString TenantName;
ui32 NodeId = 0;
TString HostName;
TString InstanceId;
@@ -1155,9 +1171,9 @@ public:
TNodesHealthCheckBuilder()
{}
- TNodesHealthCheckBuilder& SetTenant(const TString& tenant)
+ TNodesHealthCheckBuilder& SetTenantName(const TString& tenantName)
{
- Tenant = tenant;
+ TenantName = tenantName;
return *this;
}
@@ -1200,7 +1216,7 @@ public:
std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build()
{
Yq::Private::NodesHealthCheckRequest request;
- request.set_tenant(Tenant);
+ request.set_tenant(TenantName);
auto& node = *request.mutable_node();
node.set_node_id(NodeId);
node.set_instance_id(InstanceId);
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
index 222ebb6e8e3..f889e231c84 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage.cpp
@@ -97,6 +97,7 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreateQueriesTable(TActorSystem* as)
.AddNullableColumn(EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
.AddNullableColumn(RESULT_SETS_EXPIRE_AT_COLUMN_NAME, EPrimitiveType::Timestamp)
.AddNullableColumn(META_REVISION_COLUMN_NAME, EPrimitiveType::Int64)
+ .AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String)
.SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME})
.SetTtlSettings(EXPIRE_AT_COLUMN_NAME)
.Build();
@@ -129,7 +130,8 @@ TAsyncStatus TYdbControlPlaneStorageActor::CreatePendingSmallTable(TActorSystem*
.AddNullableColumn(IS_RESIGN_QUERY_COLUMN_NAME, EPrimitiveType::Bool)
.AddNullableColumn(HOST_NAME_COLUMN_NAME, EPrimitiveType::String)
.AddNullableColumn(OWNER_COLUMN_NAME, EPrimitiveType::String)
- .SetPrimaryKeyColumns({SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME})
+ .AddNullableColumn(TENANT_COLUMN_NAME, EPrimitiveType::String)
+ .SetPrimaryKeyColumns({TENANT_COLUMN_NAME, SCOPE_COLUMN_NAME, QUERY_ID_COLUMN_NAME})
.Build();
return YdbConnection->Client.RetryOperation(
@@ -675,8 +677,9 @@ NActors::IActor* CreateYdbControlPlaneStorageServiceActor(
const NConfig::TCommonConfig& common,
const NMonitoring::TDynamicCounterPtr& counters,
const ::NYq::TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
- return new TYdbControlPlaneStorageActor(config, common, counters, yqSharedResources, credentialsProviderFactory);
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
+ const TString& tenantName) {
+ return new TYdbControlPlaneStorageActor(config, common, counters, yqSharedResources, credentialsProviderFactory, tenantName);
}
} // NYq
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
index 1c8cda7ee72..c7ecc7ed963 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_impl.h
@@ -184,6 +184,7 @@ class TYdbControlPlaneStorageActor : public NActors::TActorBootstrapped<TYdbCont
static constexpr int64_t InitialRevision = 1;
NKikimr::TYdbCredentialsProviderFactory CredProviderFactory;
+ TString TenantName;
public:
TYdbControlPlaneStorageActor(
@@ -191,12 +192,14 @@ public:
const NConfig::TCommonConfig& common,
const NMonitoring::TDynamicCounterPtr& counters,
const ::NYq::TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory)
+ const NKikimr::TYdbCredentialsProviderFactory& credProviderFactory,
+ const TString& tenantName)
: Counters(counters)
, FinalStatusCounters(counters)
, Config(config, common)
, YqSharedResources(yqSharedResources)
, CredProviderFactory(credProviderFactory)
+ , TenantName(tenantName)
{
}
@@ -837,6 +840,8 @@ private:
std::shared_ptr<TResponseTasks> responseTasks,
const TVector<TValidationQuery>& validators = {},
TTxSettings transactionMode = TTxSettings::SerializableRW());
+
+ TString AssignTenantName(const TString& cloudId, const TString& scope);
};
}
diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
index 398725f7f46..3d23f868768 100644
--- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
+++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp
@@ -14,6 +14,8 @@
#include <ydb/public/api/protos/yq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
+#include <util/digest/multi.h>
+
namespace {
YandexQuery::IamAuth::IdentityCase GetIamAuth(const YandexQuery::Connection& connection) {
@@ -38,6 +40,33 @@ YandexQuery::IamAuth::IdentityCase GetIamAuth(const YandexQuery::Connection& con
namespace NYq {
+TString TYdbControlPlaneStorageActor::AssignTenantName(const TString& cloudId, const TString& scope) {
+ const auto& mapping = Config.Proto.GetMapping();
+
+ if (scope) {
+ const auto it = mapping.GetScopeToTenantName().find(scope);
+ if (it != mapping.GetScopeToTenantName().end()) {
+ return it->second;
+ }
+ }
+
+ if (cloudId) {
+ const auto it = mapping.GetCloudIdToTenantName().find(cloudId);
+ if (it != mapping.GetCloudIdToTenantName().end()) {
+ return it->second;
+ }
+ }
+
+ auto size = mapping.CommonTenantNameSize();
+
+ if (size) {
+ auto index = MultiHash(cloudId, scope) % size;
+ return mapping.GetCommonTenantName(index);
+ } else {
+ return TenantName;
+ }
+}
+
void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQueryRequest::TPtr& ev)
{
TInstant startTime = TInstant::Now();
@@ -201,6 +230,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
response->second.After.ConstructInPlace().CopyFrom(query);
TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "CreateQuery(write)");
+ writeQueryBuilder.AddString("tenant", AssignTenantName(cloudId, scope));
writeQueryBuilder.AddString("scope", scope);
writeQueryBuilder.AddString("query_id", queryId);
writeQueryBuilder.AddString("name", query.content().name());
@@ -234,10 +264,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
// insert pending small
writeQueryBuilder.AddText(
"INSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n"
- "(`" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`,\n"
+ "(`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`,\n"
"`" RETRY_COUNTER_COLUMN_NAME "`, `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n"
"VALUES\n"
- " ($scope, $query_id, $query_type, $zero_timestamp, 0, $now, \"\", \"\");"
+ " ($tenant, $scope, $query_id, $query_type, $zero_timestamp, 0, $now, \"\", \"\");"
);
}
@@ -245,10 +275,11 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvCreateQuery
writeQueryBuilder.AddText(
"INSERT INTO `" QUERIES_TABLE_NAME "` (`" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" NAME_COLUMN_NAME "`, `" STATUS_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, "
"`" EXECUTE_MODE_COLUMN_NAME "`, `" USER_COLUMN_NAME "`, `" VISIBILITY_COLUMN_NAME "`, `" AUTOMATIC_COLUMN_NAME "`, "
- "`" REVISION_COLUMN_NAME "`, `" QUERY_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "`, `" LAST_JOB_ID_COLUMN_NAME "`, `" GENERATION_COLUMN_NAME "`, `" META_REVISION_COLUMN_NAME "`)\n"
+ "`" REVISION_COLUMN_NAME "`, `" QUERY_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "`, `" LAST_JOB_ID_COLUMN_NAME "`, `" GENERATION_COLUMN_NAME "`, `" META_REVISION_COLUMN_NAME "`, "
+ "`" TENANT_COLUMN_NAME "`)\n"
"VALUES ($scope, $query_id, $name, $status, $query_type, "
"$execute_mode, $user, $visibility, $automatic, "
- "$revision, $query, $internal, $job_id, 0, 0);"
+ "$revision, $query, $internal, $job_id, 0, 0, $tenant);"
);
const auto write = writeQueryBuilder.Build();
@@ -863,6 +894,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
response->second.CloudId = internal.cloud_id();
TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "ModifyQuery(write)");
+ writeQueryBuilder.AddString("tenant", AssignTenantName(internal.cloud_id(), scope));
writeQueryBuilder.AddString("scope", scope);
writeQueryBuilder.AddString("query_id", queryId);
writeQueryBuilder.AddUint64("max_count_jobs", Config.Proto.GetMaxCountJobs());
@@ -912,10 +944,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
// insert pending small
writeQueryBuilder.AddText(
"INSERT INTO `" PENDING_SMALL_TABLE_NAME "`\n"
- " (`" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, \n"
+ " (`" TENANT_COLUMN_NAME "`, `" SCOPE_COLUMN_NAME "`, `" QUERY_ID_COLUMN_NAME "`, `" LAST_SEEN_AT_COLUMN_NAME "`, `" RETRY_COUNTER_COLUMN_NAME "`, \n"
" `" RETRY_COUNTER_UPDATE_COLUMN_NAME "`, `" QUERY_TYPE_COLUMN_NAME "`, `" HOST_NAME_COLUMN_NAME "`, `" OWNER_COLUMN_NAME "`)\n"
"VALUES\n"
- " ($scope, $query_id, $zero_timestamp, 0, $now, $query_type, \"\", \"\");\n"
+ " ($tenant, $scope, $query_id, $zero_timestamp, 0, $now, $query_type, \"\", \"\");\n"
);
}
@@ -932,7 +964,8 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery
);
writeQueryBuilder.AddText(
" `" QUERY_TYPE_COLUMN_NAME "` = $query_type, `" QUERY_COLUMN_NAME "` = $query,\n"
- " `" RESULT_ID_COLUMN_NAME "` = $result_id, `" META_REVISION_COLUMN_NAME "` = `" META_REVISION_COLUMN_NAME "` + 1\n"
+ " `" RESULT_ID_COLUMN_NAME "` = $result_id, `" META_REVISION_COLUMN_NAME "` = `" META_REVISION_COLUMN_NAME "` + 1,\n"
+ " `" TENANT_COLUMN_NAME "` = $tenant\n"
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;"
);
@@ -1044,8 +1077,10 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDeleteQuery
queryBuilder.AddText(
"DELETE FROM `" JOBS_TABLE_NAME "`\n"
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "$tenant = (SELECT `" TENANT_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n"
+ "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id);\n"
"DELETE FROM `" PENDING_SMALL_TABLE_NAME "`\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
"DELETE FROM `" QUERIES_TABLE_NAME "`\n"
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;"
);
@@ -1155,7 +1190,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
readQueryBuilder.AddString("query_id", queryId);
readQueryBuilder.AddText(
- "$selected = SELECT `" QUERY_COLUMN_NAME "`, `" LAST_JOB_ID_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n"
+ "$selected = SELECT `" QUERY_COLUMN_NAME "`, `" LAST_JOB_ID_COLUMN_NAME "`, `" INTERNAL_COLUMN_NAME "`, `" TENANT_COLUMN_NAME "` FROM `" QUERIES_TABLE_NAME "`\n"
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
"$job_id = SELECT `" LAST_JOB_ID_COLUMN_NAME "` FROM $selected;\n"
"SELECT * FROM $selected;"
@@ -1171,6 +1206,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
auto now = TInstant::Now();
YandexQuery::Query query;
YandexQuery::Internal::QueryInternal queryInternal;
+ TString tenantName;
{
TResultSetParser parser(resultSets[0]);
if (!parser.TryNextRow()) {
@@ -1184,6 +1220,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
if (!queryInternal.ParseFromString(*parser.ColumnParser(INTERNAL_COLUMN_NAME).GetOptionalString())) {
ythrow TControlPlaneStorageException(TIssuesIds::INTERNAL_ERROR) << "Error parsing proto message for query internal. Please contact internal support";
}
+ tenantName = *parser.ColumnParser(TENANT_COLUMN_NAME).GetOptionalString();
}
YandexQuery::Job job;
@@ -1262,6 +1299,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
response->second.CloudId = queryInternal.cloud_id();
TSqlQueryBuilder writeQueryBuilder(YdbConnection->TablePathPrefix, "ControlQuery(write)");
+ writeQueryBuilder.AddString("tenant", tenantName);
writeQueryBuilder.AddString("scope", scope);
writeQueryBuilder.AddString("job", job.SerializeAsString());
writeQueryBuilder.AddString("job_id", jobId);
@@ -1278,7 +1316,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvControlQuer
"UPDATE `" QUERIES_TABLE_NAME "` SET `" QUERY_COLUMN_NAME "` = $query, `" REVISION_COLUMN_NAME "` = $revision, `" STATUS_COLUMN_NAME "` = $status, `" INTERNAL_COLUMN_NAME "` = $internal, `" META_REVISION_COLUMN_NAME "` = `" META_REVISION_COLUMN_NAME "` + 1\n"
"WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;\n"
"UPDATE `" PENDING_SMALL_TABLE_NAME "` SET `" HOST_NAME_COLUMN_NAME "` = \"\"\n"
- "WHERE `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;"
+ "WHERE `" TENANT_COLUMN_NAME "` = $tenant AND `" SCOPE_COLUMN_NAME "` = $scope AND `" QUERY_ID_COLUMN_NAME "` = $query_id;"
);
const auto writeQuery = writeQueryBuilder.Build();
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 24c0e7919a3..846b5a1bc2b 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -67,7 +67,8 @@ void Init(
protoConfig.GetCommon(),
appData->Counters->GetSubgroup("counters", "yq")->GetSubgroup("subsystem", "ControlPlaneStorage"),
yqSharedResources,
- credentialsProviderFactory);
+ credentialsProviderFactory,
+ tenant);
actorRegistrator(NYq::ControlPlaneStorageServiceActorId(), controlPlaneStorage);
}
@@ -204,7 +205,8 @@ void Init(
credentialsFactory,
httpGateway,
std::move(pqCmConnections),
- clientCounters
+ clientCounters,
+ tenant
);
actorRegistrator(MakePendingFetcherId(nodeId), fetcher);
diff --git a/ydb/public/api/protos/draft/yq_private.proto b/ydb/public/api/protos/draft/yq_private.proto
index 2af1c2f3edc..ad480d6cf0d 100644
--- a/ydb/public/api/protos/draft/yq_private.proto
+++ b/ydb/public/api/protos/draft/yq_private.proto
@@ -97,7 +97,8 @@ message PingTaskRequest {
YandexQuery.StateLoadMode state_load_mode = 18;
YandexQuery.StreamingDisposition disposition = 19;
Ydb.Operations.OperationParams operation_params = 15;
- string scope = 100; //TODO remove
+ string scope = 100;
+ string tenant = 104;
google.protobuf.Timestamp started_at = 101;
google.protobuf.Timestamp finished_at = 102;
google.protobuf.Timestamp deadline = 103;