aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksei Borzenkov <snaury@ydb.tech>2024-08-29 14:00:26 +0300
committerGitHub <noreply@github.com>2024-08-29 11:00:26 +0000
commit9c8e625b136d7f671dafd60dbf69222fdbb8eaba (patch)
tree33cf9c3c4652e6304c849d692422658f7d49760f
parent3caa09fee6dc090b1bcfbf639a0488f3d60ca281 (diff)
downloadydb-9c8e625b136d7f671dafd60dbf69222fdbb8eaba.tar.gz
Implement a new tablet administration grpc service (#8400)
-rw-r--r--ydb/core/driver_lib/run/run.cpp8
-rw-r--r--ydb/core/driver_lib/run/ya.make1
-rw-r--r--ydb/core/engine/minikql/flat_local_tx_minikql.h2
-rw-r--r--ydb/core/grpc_services/audit_dml_operations.cpp21
-rw-r--r--ydb/core/grpc_services/audit_dml_operations.h11
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_backup.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_cancel_operation.cpp8
-rw-r--r--ydb/core/grpc_services/rpc_export.cpp6
-rw-r--r--ydb/core/grpc_services/rpc_forget_operation.cpp8
-rw-r--r--ydb/core/grpc_services/rpc_get_operation.cpp8
-rw-r--r--ydb/core/grpc_services/rpc_import.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_import_data.cpp4
-rw-r--r--ydb/core/grpc_services/rpc_list_operations.cpp10
-rw-r--r--ydb/core/grpc_services/rpc_login.cpp2
-rw-r--r--ydb/core/grpc_services/rpc_operation_request_base.h6
-rw-r--r--ydb/core/grpc_services/rpc_request_base.h33
-rw-r--r--ydb/core/grpc_services/tablet/rpc_change_schema.cpp160
-rw-r--r--ydb/core/grpc_services/tablet/rpc_change_schema.h11
-rw-r--r--ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp130
-rw-r--r--ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp208
-rw-r--r--ydb/core/grpc_services/tablet/rpc_execute_mkql.h11
-rw-r--r--ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp285
-rw-r--r--ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp102
-rw-r--r--ydb/core/grpc_services/tablet/rpc_restart_tablet.h11
-rw-r--r--ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp97
-rw-r--r--ydb/core/grpc_services/tablet/service_tablet.h15
-rw-r--r--ydb/core/grpc_services/tablet/ut/ya.make18
-rw-r--r--ydb/core/grpc_services/tablet/ya.make28
-rw-r--r--ydb/core/grpc_services/ya.make1
-rw-r--r--ydb/core/testlib/test_client.cpp2
-rw-r--r--ydb/core/testlib/ya.make1
-rw-r--r--ydb/services/tablet/ya.make15
-rw-r--r--ydb/services/tablet/ydb_tablet.cpp55
-rw-r--r--ydb/services/tablet/ydb_tablet.h31
-rw-r--r--ydb/services/ya.make1
36 files changed, 1280 insertions, 36 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index ef569e886f..cf05af4742 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -122,6 +122,7 @@
#include <ydb/services/ydb/ydb_scripting.h>
#include <ydb/services/ydb/ydb_table.h>
#include <ydb/services/ydb/ydb_object_storage.h>
+#include <ydb/services/tablet/ydb_tablet.h>
#include <ydb/core/fq/libs/init/init.h>
@@ -598,6 +599,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["keyvalue"] = &hasKeyValue;
TServiceCfg hasReplication = services.empty();
names["replication"] = &hasReplication;
+ TServiceCfg hasTabletService = services.empty();
+ names["tablet_service"] = &hasTabletService;
std::unordered_set<TString> enabled;
for (const auto& name : services) {
@@ -873,6 +876,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
}
+ if (hasTabletService) {
+ server.AddService(new NGRpcService::TGRpcYdbTabletService(ActorSystem.Get(), Counters, grpcRequestProxies,
+ hasTabletService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue()));
+ }
+
if (ModuleFactories) {
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
server.AddService(service);
diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make
index b83ac0465e..30a23d8f65 100644
--- a/ydb/core/driver_lib/run/ya.make
+++ b/ydb/core/driver_lib/run/ya.make
@@ -165,6 +165,7 @@ PEERDIR(
ydb/services/persqueue_v1
ydb/services/rate_limiter
ydb/services/replication
+ ydb/services/tablet
ydb/services/ydb
)
diff --git a/ydb/core/engine/minikql/flat_local_tx_minikql.h b/ydb/core/engine/minikql/flat_local_tx_minikql.h
index 71f197789a..62686fdf1b 100644
--- a/ydb/core/engine/minikql/flat_local_tx_minikql.h
+++ b/ydb/core/engine/minikql/flat_local_tx_minikql.h
@@ -106,7 +106,7 @@ class TFlatLocalMiniKQL : public NTabletFlatExecutor::ITransaction {
bool PrepareParams(TTransactionContext &txc, const TAppData *appData) {
Y_UNUSED(txc);
if (SourceProgram.Params.Binary) {
- SerializedMiniKQLParams = SourceProgram.Program.Binary;
+ SerializedMiniKQLParams = SourceProgram.Params.Binary;
return true;
}
diff --git a/ydb/core/grpc_services/audit_dml_operations.cpp b/ydb/core/grpc_services/audit_dml_operations.cpp
index 9d428d0c6b..272d064755 100644
--- a/ydb/core/grpc_services/audit_dml_operations.cpp
+++ b/ydb/core/grpc_services/audit_dml_operations.cpp
@@ -3,6 +3,7 @@
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/api/protos/ydb_scripting.pb.h>
#include <ydb/public/api/protos/ydb_query.pb.h>
+#include <ydb/public/api/protos/draft/ydb_tablet.pb.h>
#include "base/base.h"
@@ -196,4 +197,24 @@ void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest&
}
// log updated_row_count collected from ExecuteScriptMetadata.exec_stats?
+// TabletService, ExecuteTabletMiniKQL
+template <>
+void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request) {
+ if (request.dry_run()) {
+ return;
+ }
+ ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id());
+ ctx->AddAuditLogPart("program_text", PrepareText(request.program()));
+}
+
+// TabletService, ChangeTabletSchema
+template <>
+void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request) {
+ if (request.dry_run()) {
+ return;
+ }
+ ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id());
+ ctx->AddAuditLogPart("schema_changes", PrepareText(request.schema_changes()));
+}
+
} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/audit_dml_operations.h b/ydb/core/grpc_services/audit_dml_operations.h
index 9c8b600d5a..f3145e708d 100644
--- a/ydb/core/grpc_services/audit_dml_operations.h
+++ b/ydb/core/grpc_services/audit_dml_operations.h
@@ -28,6 +28,13 @@ class ExecuteScriptRequest;
}
+namespace Ydb::Tablet {
+
+class ExecuteTabletMiniKQLRequest;
+class ChangeTabletSchemaRequest;
+
+}
+
namespace NKikimr::NGRpcService {
class IAuditCtx;
@@ -80,4 +87,8 @@ template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteQue
// ExecuteSrcipt
template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest& request);
+// TabletService
+template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request);
+template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request);
+
} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
index e25877095c..5f729d9907 100644
--- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -71,7 +71,7 @@ public:
return;
}
- Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline()));
+ Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), GetDatabaseName(), ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline()));
Become(&TFetchScriptResultsRPC::StateFunc);
}
diff --git a/ydb/core/grpc_services/rpc_backup.cpp b/ydb/core/grpc_services/rpc_backup.cpp
index 94a694d759..37760e8050 100644
--- a/ydb/core/grpc_services/rpc_backup.cpp
+++ b/ydb/core/grpc_services/rpc_backup.cpp
@@ -42,7 +42,7 @@ public:
auto ev = MakeHolder<typename NSchemeShard::TEvBackup::TEvApiMapping<TIn>::TEv>();
ev->Record.SetTxId(this->TxId);
- ev->Record.SetDatabaseName(this->DatabaseName);
+ ev->Record.SetDatabaseName(this->GetDatabaseName());
if (this->UserToken) {
ev->Record.SetUserSID(this->UserToken->GetUserSID());
}
diff --git a/ydb/core/grpc_services/rpc_cancel_operation.cpp b/ydb/core/grpc_services/rpc_cancel_operation.cpp
index 885310b244..5b602ddc53 100644
--- a/ydb/core/grpc_services/rpc_cancel_operation.cpp
+++ b/ydb/core/grpc_services/rpc_cancel_operation.cpp
@@ -43,11 +43,11 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC,
IEventBase* MakeRequest() override {
switch (OperationId.GetKind()) {
case TOperationId::EXPORT:
- return new TEvExport::TEvCancelExportRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvExport::TEvCancelExportRequest(TxId, GetDatabaseName(), RawOperationId);
case TOperationId::IMPORT:
- return new TEvImport::TEvCancelImportRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvImport::TEvCancelImportRequest(TxId, GetDatabaseName(), RawOperationId);
case TOperationId::BUILD_INDEX:
- return new TEvIndexBuilder::TEvCancelRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvIndexBuilder::TEvCancelRequest(TxId, GetDatabaseName(), RawOperationId);
default:
Y_ABORT("unreachable");
}
@@ -141,7 +141,7 @@ public:
}
void SendCancelScriptExecutionOperation() {
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(DatabaseName, OperationId));
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(GetDatabaseName(), OperationId));
}
private:
diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp
index 6bf1b5c713..572b9c34e4 100644
--- a/ydb/core/grpc_services/rpc_export.cpp
+++ b/ydb/core/grpc_services/rpc_export.cpp
@@ -37,7 +37,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
auto ev = MakeHolder<TEvExport::TEvCreateExportRequest>();
ev->Record.SetTxId(this->TxId);
- ev->Record.SetDatabaseName(this->DatabaseName);
+ ev->Record.SetDatabaseName(this->GetDatabaseName());
if (this->UserToken) {
ev->Record.SetUserSID(this->UserToken->GetUserSID());
}
@@ -64,7 +64,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
TVector<TString> ExtractPaths() {
TVector<TString> paths;
- paths.emplace_back(this->DatabaseName); // first entry is database
+ paths.emplace_back(this->GetDatabaseName()); // first entry is database
ExtractPaths(paths, this->GetProtoRequest()->settings());
return paths;
@@ -74,7 +74,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
Y_ABORT_UNLESS(!paths.empty());
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->DatabaseName = this->DatabaseName;
+ request->DatabaseName = this->GetDatabaseName();
for (const auto& path : paths) {
auto& entry = request->ResultSet.emplace_back();
diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp
index 4619222b6d..2dca82f6a0 100644
--- a/ydb/core/grpc_services/rpc_forget_operation.cpp
+++ b/ydb/core/grpc_services/rpc_forget_operation.cpp
@@ -44,11 +44,11 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
IEventBase* MakeRequest() override {
switch (OperationId.GetKind()) {
case TOperationId::EXPORT:
- return new TEvExport::TEvForgetExportRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvExport::TEvForgetExportRequest(TxId, GetDatabaseName(), RawOperationId);
case TOperationId::IMPORT:
- return new TEvImport::TEvForgetImportRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvImport::TEvForgetImportRequest(TxId, GetDatabaseName(), RawOperationId);
case TOperationId::BUILD_INDEX:
- return new TEvIndexBuilder::TEvForgetRequest(TxId, DatabaseName, RawOperationId);
+ return new TEvIndexBuilder::TEvForgetRequest(TxId, GetDatabaseName(), RawOperationId);
default:
Y_ABORT("unreachable");
}
@@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}
void SendForgetScriptExecutionOperation() {
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(GetDatabaseName(), OperationId));
}
public:
diff --git a/ydb/core/grpc_services/rpc_get_operation.cpp b/ydb/core/grpc_services/rpc_get_operation.cpp
index 3fdaa8b1d2..a4a8e3b765 100644
--- a/ydb/core/grpc_services/rpc_get_operation.cpp
+++ b/ydb/core/grpc_services/rpc_get_operation.cpp
@@ -56,11 +56,11 @@ class TGetOperationRPC : public TRpcOperationRequestActor<TGetOperationRPC, TEvG
IEventBase* MakeRequest() override {
switch (OperationId_.GetKind()) {
case TOperationId::EXPORT:
- return new NSchemeShard::TEvExport::TEvGetExportRequest(DatabaseName, RawOperationId_);
+ return new NSchemeShard::TEvExport::TEvGetExportRequest(GetDatabaseName(), RawOperationId_);
case TOperationId::IMPORT:
- return new NSchemeShard::TEvImport::TEvGetImportRequest(DatabaseName, RawOperationId_);
+ return new NSchemeShard::TEvImport::TEvGetImportRequest(GetDatabaseName(), RawOperationId_);
case TOperationId::BUILD_INDEX:
- return new NSchemeShard::TEvIndexBuilder::TEvGetRequest(DatabaseName, RawOperationId_);
+ return new NSchemeShard::TEvIndexBuilder::TEvGetRequest(GetDatabaseName(), RawOperationId_);
default:
Y_ABORT("unreachable");
}
@@ -199,7 +199,7 @@ private:
}
void SendGetScriptExecutionOperation() {
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvGetScriptExecutionOperation(DatabaseName, OperationId_));
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvGetScriptExecutionOperation(GetDatabaseName(), OperationId_));
}
void Handle(NSchemeShard::TEvExport::TEvGetExportResponse::TPtr& ev, const TActorContext& ctx) {
diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp
index 174e9df84c..89061b2f8b 100644
--- a/ydb/core/grpc_services/rpc_import.cpp
+++ b/ydb/core/grpc_services/rpc_import.cpp
@@ -35,7 +35,7 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>,
auto ev = MakeHolder<TEvImport::TEvCreateImportRequest>();
ev->Record.SetTxId(this->TxId);
- ev->Record.SetDatabaseName(this->DatabaseName);
+ ev->Record.SetDatabaseName(this->GetDatabaseName());
if (this->UserToken) {
ev->Record.SetUserSID(this->UserToken->GetUserSID());
}
diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp
index 250421386d..75565ebcb3 100644
--- a/ydb/core/grpc_services/rpc_import_data.cpp
+++ b/ydb/core/grpc_services/rpc_import_data.cpp
@@ -120,7 +120,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque
void ResolvePath() {
auto request = MakeHolder<TNavigate>();
- request->DatabaseName = NKikimr::CanonizePath(DatabaseName);
+ request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName());
auto& entry = request->ResultSet.emplace_back();
entry.Operation = TNavigate::OpTable;
@@ -179,7 +179,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque
void ResolveKeys() {
auto request = MakeHolder<TResolve>();
- request->DatabaseName = NKikimr::CanonizePath(DatabaseName);
+ request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName());
request->ResultSet.emplace_back(std::move(KeyDesc));
request->ResultSet.back().Access = NACLib::UpdateRow;
diff --git a/ydb/core/grpc_services/rpc_list_operations.cpp b/ydb/core/grpc_services/rpc_list_operations.cpp
index 20742675b5..47805e7b56 100644
--- a/ydb/core/grpc_services/rpc_list_operations.cpp
+++ b/ydb/core/grpc_services/rpc_list_operations.cpp
@@ -54,13 +54,13 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
switch (ParseKind(GetProtoRequest()->kind())) {
case TOperationId::SS_BG_TASKS:
- return new NSchemeShard::NBackground::TEvListRequest(DatabaseName, request.page_size(), request.page_token());
+ return new NSchemeShard::NBackground::TEvListRequest(GetDatabaseName(), request.page_size(), request.page_token());
case TOperationId::EXPORT:
- return new TEvExport::TEvListExportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind());
+ return new TEvExport::TEvListExportsRequest(GetDatabaseName(), request.page_size(), request.page_token(), request.kind());
case TOperationId::IMPORT:
- return new TEvImport::TEvListImportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind());
+ return new TEvImport::TEvListImportsRequest(GetDatabaseName(), request.page_size(), request.page_token(), request.kind());
case TOperationId::BUILD_INDEX:
- return new TEvIndexBuilder::TEvListRequest(DatabaseName, request.page_size(), request.page_token());
+ return new TEvIndexBuilder::TEvListRequest(GetDatabaseName(), request.page_size(), request.page_token());
default:
Y_ABORT("unreachable");
}
@@ -141,7 +141,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T
}
void SendListScriptExecutions() {
- Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(DatabaseName, GetProtoRequest()->page_size(), GetProtoRequest()->page_token()));
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(GetDatabaseName(), GetProtoRequest()->page_size(), GetProtoRequest()->page_token()));
}
void Handle(NKqp::TEvListScriptExecutionOperationsResponse::TPtr& ev) {
diff --git a/ydb/core/grpc_services/rpc_login.cpp b/ydb/core/grpc_services/rpc_login.cpp
index f259181d2e..b80e713d74 100644
--- a/ydb/core/grpc_services/rpc_login.cpp
+++ b/ydb/core/grpc_services/rpc_login.cpp
@@ -43,7 +43,7 @@ public:
const Ydb::Auth::LoginRequest* protoRequest = GetProtoRequest();
Credentials = PrepareCredentials(protoRequest->user(), protoRequest->password(), AppData()->AuthConfig);
TString domainName = "/" + AppData()->DomainsInfo->GetDomain()->Name;
- PathToDatabase = AppData()->AuthConfig.GetDomainLoginOnly() ? domainName : DatabaseName;
+ PathToDatabase = AppData()->AuthConfig.GetDomainLoginOnly() ? domainName : GetDatabaseName();
auto sendParameters = GetSendParameters(Credentials, PathToDatabase);
Send(sendParameters.Recipient, sendParameters.Event.Release());
Become(&TThis::StateWork, Timeout, new TEvents::TEvWakeup());
diff --git a/ydb/core/grpc_services/rpc_operation_request_base.h b/ydb/core/grpc_services/rpc_operation_request_base.h
index cf8cca65c1..14548d11df 100644
--- a/ydb/core/grpc_services/rpc_operation_request_base.h
+++ b/ydb/core/grpc_services/rpc_operation_request_base.h
@@ -47,14 +47,14 @@ protected:
void ResolveDatabase() {
LOG_D("Resolve database"
- << ": name# " << this->DatabaseName);
+ << ": name# " << this->GetDatabaseName());
auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
- request->DatabaseName = this->DatabaseName;
+ request->DatabaseName = this->GetDatabaseName();
auto& entry = request->ResultSet.emplace_back();
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
- entry.Path = NKikimr::SplitPath(this->DatabaseName);
+ entry.Path = NKikimr::SplitPath(this->GetDatabaseName());
this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release()));
}
diff --git a/ydb/core/grpc_services/rpc_request_base.h b/ydb/core/grpc_services/rpc_request_base.h
index 3a8ff0f578..e1ec39ace2 100644
--- a/ydb/core/grpc_services/rpc_request_base.h
+++ b/ydb/core/grpc_services/rpc_request_base.h
@@ -128,10 +128,16 @@ public:
explicit TRpcRequestActor(TRequestCtx* ev)
: Request(ev)
- , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData())))
+ , UserToken(CreateUserToken(Request.Get()))
{
- if (const auto& userToken = Request->GetSerializedToken()) {
- UserToken = MakeHolder<NACLib::TUserToken>(userToken);
+ }
+
+private:
+ static THolder<const NACLib::TUserToken> CreateUserToken(TRequestCtx* request) {
+ if (const auto& userToken = request->GetSerializedToken()) {
+ return MakeHolder<NACLib::TUserToken>(userToken);
+ } else {
+ return {};
}
}
@@ -164,9 +170,24 @@ private:
}
protected:
- THolder<TRequestCtx> Request;
- const TString DatabaseName;
- THolder<const NACLib::TUserToken> UserToken;
+ const TString& GetDatabaseName() const {
+ if (!DatabaseName_.has_value()) {
+ auto name = Request->GetDatabaseName();
+ if (name) {
+ DatabaseName_.emplace(std::move(*name));
+ } else {
+ DatabaseName_.emplace(DatabaseFromDomain(AppData()));
+ }
+ }
+ return *DatabaseName_;
+ }
+
+protected:
+ const THolder<TRequestCtx> Request;
+ const THolder<const NACLib::TUserToken> UserToken;
+
+private:
+ mutable std::optional<TString> DatabaseName_;
}; // TRpcRequestActor
diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema.cpp b/ydb/core/grpc_services/tablet/rpc_change_schema.cpp
new file mode 100644
index 0000000000..72a2f5af84
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_change_schema.cpp
@@ -0,0 +1,160 @@
+#include "rpc_change_schema.h"
+#include "service_tablet.h"
+
+#include <ydb/core/grpc_services/rpc_request_base.h>
+#include <ydb/core/grpc_services/audit_dml_operations.h>
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/protos/scheme_log.pb.h>
+
+#include <library/cpp/protobuf/json/json2proto.h>
+#include <library/cpp/protobuf/json/proto2json.h>
+
+namespace NKikimr::NGRpcService {
+
+class TRpcChangeTabletSchema : public TRpcRequestActor<TRpcChangeTabletSchema, TEvChangeTabletSchemaRequest> {
+ using TBase = TRpcRequestActor<TRpcChangeTabletSchema, TEvChangeTabletSchemaRequest>;
+
+public:
+ using TBase::TBase;
+
+ void Bootstrap() {
+ if (!CheckAccess()) {
+ auto error = TStringBuilder() << "Access denied";
+ if (this->UserToken) {
+ error << ": '" << this->UserToken->GetUserSID() << "' is not an admin";
+ }
+
+ this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error);
+ return;
+ }
+
+ auto* req = this->GetProtoRequest();
+ AuditContextAppend(Request.Get(), *req);
+
+ try {
+ TabletId = req->tablet_id();
+ TabletReq = std::make_unique<TEvTablet::TEvLocalSchemeTx>();
+ if (const auto& changes = req->schema_changes(); !changes.empty()) {
+ NProtobufJson::Json2Proto(changes, *TabletReq->Record.MutableSchemeChanges(), {
+ .FieldNameMode = NProtobufJson::TJson2ProtoConfig::FieldNameSnakeCaseDense,
+ .AllowUnknownFields = false,
+ .MapAsObject = true,
+ .EnumValueMode = NProtobufJson::TJson2ProtoConfig::EnumSnakeCaseInsensitive,
+ });
+ }
+ TabletReq->Record.SetDryRun(req->dry_run());
+ } catch (const std::exception& e) {
+ this->Reply(Ydb::StatusIds::BAD_REQUEST, e.what());
+ return;
+ }
+
+ PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{
+ // We need at least one retry since local resolver cache may be outdated
+ .RetryLimitCount = 1,
+ }));
+
+ Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup);
+
+ Become(&TThis::StateWork);
+ }
+
+private:
+ bool CheckAccess() const {
+ if (AppData()->AdministrationAllowedSIDs.empty()) {
+ return true;
+ }
+
+ if (!this->UserToken) {
+ return false;
+ }
+
+ for (const auto& sid : AppData()->AdministrationAllowedSIDs) {
+ if (this->UserToken->IsExist(sid)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+private:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvTablet::TEvLocalSchemeTxResponse, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ auto* msg = ev->Get();
+ if (msg->Status != NKikimrProto::OK) {
+ this->Reply(Ydb::StatusIds::UNAVAILABLE,
+ TStringBuilder() << "Tablet " << TabletId << " is unavailable");
+ return;
+ }
+
+ NTabletPipe::SendData(SelfId(), PipeClient, TabletReq.release());
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ this->Reply(Ydb::StatusIds::UNDETERMINED,
+ TStringBuilder() << "Tablet " << TabletId << " disconnected");
+ }
+
+ void Handle(TEvTablet::TEvLocalSchemeTxResponse::TPtr& ev) {
+ NTabletPipe::CloseClient(SelfId(), PipeClient);
+
+ auto* msg = ev->Get();
+ if (msg->Record.GetStatus() != NKikimrProto::OK) {
+ this->Reply(Ydb::StatusIds::GENERIC_ERROR,
+ msg->Record.HasErrorReason() ? msg->Record.GetErrorReason() : "Unknown error");
+ return;
+ }
+
+ auto* response = google::protobuf::Arena::CreateMessage<Ydb::Tablet::ChangeTabletSchemaResponse>(Request->GetArena());
+ response->set_status(Ydb::StatusIds::SUCCESS);
+ if (msg->Record.HasFullScheme()) {
+ try {
+ TString text;
+ NProtobufJson::Proto2Json(msg->Record.GetFullScheme(), text, {
+ .EnumMode = NProtobufJson::TProto2JsonConfig::EnumName,
+ .FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense,
+ .MapAsObject = true,
+ });
+ response->set_schema(std::move(text));
+ } catch (const std::exception& e) {
+ response->set_status(Ydb::StatusIds::GENERIC_ERROR);
+ auto* issue = response->add_issues();
+ issue->set_severity(NYql::TSeverityIds::S_ERROR);
+ issue->set_message(e.what());
+ }
+ }
+ Request->Reply(response, response->status());
+ PassAway();
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ NTabletPipe::CloseClient(SelfId(), PipeClient);
+ this->Reply(Ydb::StatusIds::TIMEOUT,
+ TStringBuilder() << "Tablet " << TabletId << " is not responding");
+ }
+
+private:
+ ui64 TabletId;
+ std::unique_ptr<TEvTablet::TEvLocalSchemeTx> TabletReq;
+ TActorId PipeClient;
+};
+
+void DoChangeTabletSchemaRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ f.RegisterActor(new TRpcChangeTabletSchema(p.release()));
+}
+
+template<>
+IActor* TEvChangeTabletSchemaRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) {
+ return new TRpcChangeTabletSchema(msg);
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema.h b/ydb/core/grpc_services/tablet/rpc_change_schema.h
new file mode 100644
index 0000000000..59566e28fc
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_change_schema.h
@@ -0,0 +1,11 @@
+#pragma once
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/public/api/protos/draft/ydb_tablet.pb.h>
+
+namespace NKikimr::NGRpcService {
+
+using TEvChangeTabletSchemaRequest = TGrpcRequestNoOperationCall<
+ Ydb::Tablet::ChangeTabletSchemaRequest,
+ Ydb::Tablet::ChangeTabletSchemaResponse>;
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp b/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp
new file mode 100644
index 0000000000..1228ab5a3f
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp
@@ -0,0 +1,130 @@
+#include "rpc_change_schema.h"
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NGRpcService {
+
+using namespace Tests;
+
+Y_UNIT_TEST_SUITE(TabletService_ChangeSchema) {
+
+ NThreading::TFuture<Ydb::Tablet::ChangeTabletSchemaResponse> ChangeSchema(
+ TTestActorRuntime& runtime, ui64 tabletId, const TString& changes,
+ const TString& token = {},
+ bool dryRun = false)
+ {
+ // Cerr << "ChangeSchema: <<<" << changes << ">>>" << Endl;
+ Ydb::Tablet::ChangeTabletSchemaRequest request;
+ request.set_tablet_id(tabletId);
+ request.set_schema_changes(changes);
+ request.set_dry_run(dryRun);
+ return NRpcService::DoLocalRpc<TEvChangeTabletSchemaRequest>(
+ std::move(request), "/Root", token, runtime.GetActorSystem(0));
+ }
+
+ TString MakeSchemaChange() {
+ return R"__(
+ {"delta": [
+ {"delta_type": "AddTable",
+ "table_id": 5555,
+ "table_name": "MyAwesomeTable"},
+ {"delta_type": "AddColumn",
+ "table_id": 5555,
+ "column_id": 1,
+ "column_name": "MyAwesomeKey",
+ "column_type": 4},
+ {"delta_type": "AddColumnToKey",
+ "table_id": 5555,
+ "column_id": 1}
+ ]}
+ )__";
+ }
+
+ Y_UNIT_TEST(Basics) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+
+ Cerr << "... reading schema" << Endl;
+ auto future = ChangeSchema(runtime, schemeShardId, "");
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_C(result.schema().StartsWith(R"__({"delta":[{"delta_type":"AddTable","table_id":1,"table_name":"Paths"},)__"), result.schema());
+
+ Cerr << "... changing schema (dry run)" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, MakeSchemaChange(), {}, /* dryRun */ true);
+ result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema());
+
+ Cerr << "... reading schema" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, "");
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_C(!result.schema().Contains("MyAwesomeTable"), result.schema());
+
+ Cerr << "... changing schema" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, MakeSchemaChange());
+ result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema());
+
+ Cerr << "... reading schema" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, "");
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema());
+ }
+
+ Y_UNIT_TEST(OnlyAdminsAllowed) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin");
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+
+ Cerr << "... reading schema (without token)" << Endl;
+ auto future = ChangeSchema(runtime, schemeShardId, "");
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ Cerr << "... reading schema (non-admin token)" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, "", NACLib::TUserToken("user@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ Cerr << "... reading schema (admin token)" << Endl;
+ future = ChangeSchema(runtime, schemeShardId, "", NACLib::TUserToken("root@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ }
+
+} // Y_UNIT_TEST_SUITE(TabletService_ChangeSchema)
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp b/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp
new file mode 100644
index 0000000000..921730f520
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp
@@ -0,0 +1,208 @@
+#include "rpc_execute_mkql.h"
+#include "service_tablet.h"
+
+#include <ydb/core/grpc_services/rpc_request_base.h>
+#include <ydb/core/grpc_services/audit_dml_operations.h>
+#include <ydb/core/base/tablet.h>
+#include <ydb/core/base/tablet_pipe.h>
+#include <ydb/core/protos/tx_proxy.pb.h>
+
+#include <ydb/library/mkql_proto/mkql_proto.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+#include <ydb/library/yql/minikql/mkql_mem_info.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/minikql/mkql_node_serialization.h>
+
+namespace NKikimr::NGRpcService {
+
+
+class TRpcExecuteTabletMiniKQL : public TRpcRequestActor<TRpcExecuteTabletMiniKQL, TEvExecuteTabletMiniKQLRequest> {
+ using TBase = TRpcRequestActor<TRpcExecuteTabletMiniKQL, TEvExecuteTabletMiniKQLRequest>;
+
+public:
+ using TBase::TBase;
+
+ void Bootstrap() {
+ if (!CheckAccess()) {
+ auto error = TStringBuilder() << "Access denied";
+ if (this->UserToken) {
+ error << ": '" << this->UserToken->GetUserSID() << "' is not an admin";
+ }
+
+ this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error);
+ return;
+ }
+
+ auto* req = this->GetProtoRequest();
+ AuditContextAppend(Request.Get(), *req);
+
+ try {
+ TabletId = req->tablet_id();
+ TabletReq = std::make_unique<TEvTablet::TEvLocalMKQL>();
+ auto* tx = TabletReq->Record.MutableProgram();
+ tx->MutableProgram()->SetText(req->program());
+ if (const auto& params = req->parameters(); !params.empty()) {
+ auto* functionRegistry = AppData()->FunctionRegistry;
+ NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators());
+ NMiniKQL::TTypeEnvironment env(alloc);
+ NMiniKQL::TMemoryUsageInfo memInfo("TRpcExecuteTabletMiniKQL");
+ NMiniKQL::THolderFactory factory(alloc.Ref(), memInfo, functionRegistry);
+ // NKikimrMiniKQL.TParams
+ auto* protoParams = tx->MutableParams()->MutableProto();
+ protoParams->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct);
+ auto* protoStructType = protoParams->MutableType()->MutableStruct();
+ auto* protoValue = protoParams->MutableValue();
+ for (const auto& pr : params) {
+ auto* protoMember = protoStructType->AddMember();
+ protoMember->SetName(pr.first);
+ auto [pType, value] = NMiniKQL::ImportValueFromProto(pr.second.type(), pr.second.value(), env, factory);
+ ExportTypeToProto(pType, *protoMember->MutableType());
+ ExportValueToProto(pType, value, *protoValue->AddStruct());
+ }
+ // Tablet needs a serialized runtime node for parameters
+ auto node = NMiniKQL::ImportValueFromProto(*protoParams, env);
+ tx->MutableParams()->SetBin(NMiniKQL::SerializeRuntimeNode(node, env));
+ // We no longer need the protobuf parameters
+ tx->MutableParams()->ClearProto();
+ }
+
+ if (req->dry_run()) {
+ tx->SetMode(NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE);
+ }
+ } catch (const std::exception& e) {
+ this->Reply(Ydb::StatusIds::BAD_REQUEST, e.what());
+ return;
+ }
+
+ PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{
+ // We need at least one retry since local resolver cache may be outdated
+ .RetryLimitCount = 1,
+ }));
+
+ Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup);
+
+ Become(&TThis::StateWork);
+ }
+
+private:
+ bool CheckAccess() const {
+ if (AppData()->AdministrationAllowedSIDs.empty()) {
+ return true;
+ }
+
+ if (!this->UserToken) {
+ return false;
+ }
+
+ for (const auto& sid : AppData()->AdministrationAllowedSIDs) {
+ if (this->UserToken->IsExist(sid)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+private:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvTablet::TEvLocalMKQLResponse, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ auto* msg = ev->Get();
+ if (msg->Status != NKikimrProto::OK) {
+ this->Reply(Ydb::StatusIds::UNAVAILABLE,
+ TStringBuilder() << "Tablet " << TabletId << " is unavailable");
+ return;
+ }
+
+ NTabletPipe::SendData(SelfId(), PipeClient, TabletReq.release());
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ this->Reply(Ydb::StatusIds::UNDETERMINED,
+ TStringBuilder() << "Tablet " << TabletId << " disconnected");
+ }
+
+ void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr& ev) {
+ NTabletPipe::CloseClient(SelfId(), PipeClient);
+
+ auto* msg = ev->Get();
+ auto* response = google::protobuf::Arena::CreateMessage<Ydb::Tablet::ExecuteTabletMiniKQLResponse>(Request->GetArena());
+
+ if (msg->Record.HasCompileResults()) {
+ for (const auto& issue : msg->Record.GetCompileResults().GetProgramCompileErrors()) {
+ *response->add_issues() = issue;
+ }
+ for (const auto& issue : msg->Record.GetCompileResults().GetParamsCompileErrors()) {
+ *response->add_issues() = issue;
+ }
+ }
+
+ if (const TString& errors = msg->Record.GetMiniKQLErrors(); !errors.empty()) {
+ auto* issue = response->add_issues();
+ issue->set_severity(NYql::TSeverityIds::S_ERROR);
+ issue->set_message(errors);
+ }
+
+ if (msg->Record.GetStatus() != NKikimrProto::OK) {
+ response->set_status(Ydb::StatusIds::GENERIC_ERROR);
+ Request->Reply(response, response->status());
+ return PassAway();
+ }
+
+ response->set_status(Ydb::StatusIds::SUCCESS);
+
+ if (msg->Record.HasExecutionEngineEvaluatedResponse()) {
+ try {
+ const auto& protoResult = msg->Record.GetExecutionEngineEvaluatedResponse();
+ auto* functionRegistry = AppData()->FunctionRegistry;
+ NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators());
+ NMiniKQL::TTypeEnvironment env(alloc);
+ NMiniKQL::TMemoryUsageInfo memInfo("TRpcExecuteTabletMiniKQL");
+ NMiniKQL::THolderFactory factory(alloc.Ref(), memInfo, functionRegistry);
+ auto [pType, value] = NMiniKQL::ImportValueFromProto(protoResult.GetType(), protoResult.GetValue(), env, factory);
+ ExportTypeToProto(pType, *response->mutable_result()->mutable_type());
+ ExportValueToProto(pType, value, *response->mutable_result()->mutable_value());
+ } catch (const std::exception& e) {
+ response->set_status(Ydb::StatusIds::GENERIC_ERROR);
+ auto* issue = response->add_issues();
+ issue->set_severity(NYql::TSeverityIds::S_ERROR);
+ issue->set_message(e.what());
+ Request->Reply(response, response->status());
+ return PassAway();
+ }
+ }
+
+ Request->Reply(response, response->status());
+ return PassAway();
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ NTabletPipe::CloseClient(SelfId(), PipeClient);
+ this->Reply(Ydb::StatusIds::TIMEOUT,
+ TStringBuilder() << "Tablet " << TabletId << " is not responding");
+ }
+
+private:
+ ui64 TabletId;
+ std::unique_ptr<TEvTablet::TEvLocalMKQL> TabletReq;
+ TActorId PipeClient;
+};
+
+void DoExecuteTabletMiniKQLRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ f.RegisterActor(new TRpcExecuteTabletMiniKQL(p.release()));
+}
+
+template<>
+IActor* TEvExecuteTabletMiniKQLRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) {
+ return new TRpcExecuteTabletMiniKQL(msg);
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql.h b/ydb/core/grpc_services/tablet/rpc_execute_mkql.h
new file mode 100644
index 0000000000..45c5f4421d
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql.h
@@ -0,0 +1,11 @@
+#pragma once
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/public/api/protos/draft/ydb_tablet.pb.h>
+
+namespace NKikimr::NGRpcService {
+
+using TEvExecuteTabletMiniKQLRequest = TGrpcRequestNoOperationCall<
+ Ydb::Tablet::ExecuteTabletMiniKQLRequest,
+ Ydb::Tablet::ExecuteTabletMiniKQLResponse>;
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp b/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp
new file mode 100644
index 0000000000..9eb251ec44
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp
@@ -0,0 +1,285 @@
+#include "rpc_execute_mkql.h"
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NGRpcService {
+
+using namespace Tests;
+
+Y_UNIT_TEST_SUITE(TabletService_ExecuteMiniKQL) {
+
+ Ydb::TypedValue MakeUint64(ui64 value) {
+ Ydb::TypedValue ret;
+ ret.mutable_type()->set_type_id(Ydb::Type::UINT64);
+ ret.mutable_value()->set_uint64_value(value);
+ return ret;
+ }
+
+ NThreading::TFuture<Ydb::Tablet::ExecuteTabletMiniKQLResponse> ExecuteMiniKQL(
+ TTestActorRuntime& runtime, ui64 tabletId, const TString& program,
+ const std::unordered_map<TString, Ydb::TypedValue>& params = {},
+ const TString& token = {},
+ bool dryRun = false)
+ {
+ // Cerr << "ExecuteMiniKQL: <<<" << program << ">>>" << Endl;
+ Ydb::Tablet::ExecuteTabletMiniKQLRequest request;
+ request.set_tablet_id(tabletId);
+ request.set_program(program);
+ for (const auto& pr : params) {
+ (*request.mutable_parameters())[pr.first] = pr.second;
+ }
+ request.set_dry_run(dryRun);
+ return NRpcService::DoLocalRpc<TEvExecuteTabletMiniKQLRequest>(
+ std::move(request), "/Root", token, runtime.GetActorSystem(0));
+ }
+
+ Y_UNIT_TEST(BasicMiniKQLRead) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___");
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.result().value().ShortDebugString(),
+ "items { items { uint64_value: 1 } items { text_value: \"Root\" } }",
+ result.DebugString());
+ }
+
+ Y_UNIT_TEST(ParamsMiniKQLRead) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let p (Parameter 'p (DataType 'Uint64)))
+ (let key '('('Id p)))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'p p)
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___", {{"p", MakeUint64(1)}});
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.result().value().ShortDebugString(),
+ "items { uint64_value: 1 } items { items { uint64_value: 1 } items { text_value: \"Root\" } }",
+ result.DebugString());
+ }
+
+ Ydb::TypedValue MakeMalformedValue() {
+ Ydb::TypedValue ret;
+ // Type is a struct with 2 members
+ Ydb::StructType* s = ret.mutable_type()->mutable_struct_type();
+ Ydb::StructMember* m1 = s->add_members();
+ m1->set_name("m1");
+ m1->mutable_type()->set_type_id(Ydb::Type::UINT64);
+ Ydb::StructMember* m2 = s->add_members();
+ m2->set_name("m2");
+ m2->mutable_type()->set_type_id(Ydb::Type::UINT64);
+ // Value has only one member: malformed
+ ret.mutable_value()->add_items()->set_uint64_value(42);
+ return ret;
+ }
+
+ Y_UNIT_TEST(MalformedParams) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let p (Parameter 'p (DataType 'Uint64)))
+ (let key '('('Id p)))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'p p)
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___", {{"p", MakeMalformedValue()}});
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::BAD_REQUEST, result.DebugString());
+ }
+
+ Y_UNIT_TEST(MalformedProgram) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'NoSuchTable key select))
+ ))
+ ))___");
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::GENERIC_ERROR, result.DebugString());
+ }
+
+ Y_UNIT_TEST(DryRunEraseRow) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (return (AsList
+ (EraseRow 'Paths key)
+ ))
+ ))___", {}, {}, /* dryRun */ true);
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result (dry run EraseRow):\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+
+ future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___");
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result (SelectRow):\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.result().value().ShortDebugString(),
+ "items { items { uint64_value: 1 } items { text_value: \"Root\" } }",
+ result.DebugString());
+
+ // Repeat request without dry_run
+ future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (return (AsList
+ (EraseRow 'Paths key)
+ ))
+ ))___");
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result (EraseRow):\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+
+ future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___");
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result (SelectRow):\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.result().value().ShortDebugString(),
+ "items { nested_value { null_flag_value: NULL_VALUE } }",
+ result.DebugString());
+ }
+
+ Y_UNIT_TEST(OnlyAdminsAllowed) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin");
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___");
+ auto result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___", {}, NACLib::TUserToken("user@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ future = ExecuteMiniKQL(runtime, schemeShardId, R"___((
+ (let key '('('Id (Uint64 '1))))
+ (let select '('Id 'Name))
+ (return (AsList
+ (SetResult 'row (SelectRow 'Paths key select))
+ ))
+ ))___", {}, NACLib::TUserToken("root@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ // Cerr << "Got result:\n" << result.DebugString();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ result.result().value().ShortDebugString(),
+ "items { items { uint64_value: 1 } items { text_value: \"Root\" } }",
+ result.DebugString());
+ }
+
+} // Y_UNIT_TEST_SUITE(TabletService_ExecuteMiniKQL)
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp b/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp
new file mode 100644
index 0000000000..3b1d91d252
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp
@@ -0,0 +1,102 @@
+#include "rpc_restart_tablet.h"
+#include "service_tablet.h"
+
+#include <ydb/core/grpc_services/rpc_request_base.h>
+#include <ydb/core/base/tablet_pipe.h>
+
+namespace NKikimr::NGRpcService {
+
+class TRpcRestartTablet : public TRpcRequestActor<TRpcRestartTablet, TEvRestartTabletRequest> {
+ using TBase = TRpcRequestActor<TRpcRestartTablet, TEvRestartTabletRequest>;
+
+public:
+ using TBase::TBase;
+
+ void Bootstrap() {
+ if (!CheckAccess()) {
+ auto error = TStringBuilder() << "Access denied";
+ if (this->UserToken) {
+ error << ": '" << this->UserToken->GetUserSID() << "' is not an admin";
+ }
+
+ this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error);
+ return;
+ }
+
+ auto* req = this->GetProtoRequest();
+ TabletId = req->tablet_id();
+ PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{
+ // We need at least one retry since local resolver cache may be outdated
+ .RetryLimitCount = 1,
+ }));
+
+ Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup);
+
+ Become(&TThis::StateWork);
+ }
+
+private:
+ bool CheckAccess() const {
+ if (AppData()->AdministrationAllowedSIDs.empty()) {
+ return true;
+ }
+
+ if (!this->UserToken) {
+ return false;
+ }
+
+ for (const auto& sid : AppData()->AdministrationAllowedSIDs) {
+ if (this->UserToken->IsExist(sid)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+private:
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvTabletPipe::TEvClientConnected, Handle);
+ hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
+ }
+ }
+
+ void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) {
+ auto* msg = ev->Get();
+ if (msg->Status != NKikimrProto::OK) {
+ this->Reply(Ydb::StatusIds::UNAVAILABLE,
+ TStringBuilder() << "Tablet " << TabletId << " is unavailable");
+ return;
+ }
+
+ // Note: we send the poison message and wait for the pipe to close
+ NTabletPipe::SendData(SelfId(), PipeClient, new TEvents::TEvPoison);
+ }
+
+ void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) {
+ this->Reply(Ydb::StatusIds::SUCCESS);
+ }
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ NTabletPipe::CloseClient(SelfId(), PipeClient);
+ this->Reply(Ydb::StatusIds::TIMEOUT,
+ TStringBuilder() << "Tablet " << TabletId << " is not responding");
+ }
+
+private:
+ ui64 TabletId;
+ TActorId PipeClient;
+};
+
+void DoRestartTabletRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ f.RegisterActor(new TRpcRestartTablet(p.release()));
+}
+
+template<>
+IActor* TEvRestartTabletRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) {
+ return new TRpcRestartTablet(msg);
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet.h b/ydb/core/grpc_services/tablet/rpc_restart_tablet.h
new file mode 100644
index 0000000000..f4615e9cee
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet.h
@@ -0,0 +1,11 @@
+#pragma once
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/public/api/protos/draft/ydb_tablet.pb.h>
+
+namespace NKikimr::NGRpcService {
+
+using TEvRestartTabletRequest = TGrpcRequestNoOperationCall<
+ Ydb::Tablet::RestartTabletRequest,
+ Ydb::Tablet::RestartTabletResponse>;
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp b/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp
new file mode 100644
index 0000000000..4b585fcf7d
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp
@@ -0,0 +1,97 @@
+#include "rpc_restart_tablet.h"
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/testlib/tablet_helpers.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr::NGRpcService {
+
+using namespace Tests;
+
+Y_UNIT_TEST_SUITE(TabletService_Restart) {
+
+ NThreading::TFuture<Ydb::Tablet::RestartTabletResponse> RestartRpc(
+ TTestActorRuntime& runtime, ui64 tabletId,
+ const TString& token = {})
+ {
+ Ydb::Tablet::RestartTabletRequest request;
+ request.set_tablet_id(tabletId);
+ return NRpcService::DoLocalRpc<TEvRestartTabletRequest>(
+ std::move(request), "/Root", token, runtime.GetActorSystem(0));
+ }
+
+ Y_UNIT_TEST(Basics) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto actorBefore = ResolveTablet(runtime, schemeShardId);
+
+ Cerr << "... restarting tablet " << schemeShardId << Endl;
+ auto future = RestartRpc(runtime, schemeShardId);
+ auto result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ InvalidateTabletResolverCache(runtime, schemeShardId);
+ auto actorAfter = ResolveTablet(runtime, schemeShardId);
+
+ UNIT_ASSERT_C(actorBefore != actorAfter, "SchemeShard actor " << actorBefore << " didn't change");
+ }
+
+ Y_UNIT_TEST(OnlyAdminsAllowed) {
+ TPortManager pm;
+ TServerSettings serverSettings(pm.GetPort(2134));
+ serverSettings.SetDomainName("Root")
+ .SetUseRealThreads(false);
+
+ Tests::TServer::TPtr server = new TServer(serverSettings);
+ auto& runtime = *server->GetRuntime();
+ runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin");
+
+ auto sender = runtime.AllocateEdgeActor();
+ server->SetupRootStoragePools(sender);
+
+ ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain);
+ auto actorBefore = ResolveTablet(runtime, schemeShardId);
+
+ Cerr << "... restarting tablet " << schemeShardId << " (without token)" << Endl;
+ auto future = RestartRpc(runtime, schemeShardId);
+ auto result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ Cerr << "... restarting tablet " << schemeShardId << " (non-admin token)" << Endl;
+ future = RestartRpc(runtime, schemeShardId, NACLib::TUserToken("user@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString());
+
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ InvalidateTabletResolverCache(runtime, schemeShardId);
+ auto actorNoRestart = ResolveTablet(runtime, schemeShardId);
+
+ UNIT_ASSERT_C(actorBefore == actorNoRestart, "SchemeShard actor " << actorBefore << " changed to " << actorNoRestart);
+
+ Cerr << "... restarting tablet " << schemeShardId << " (admin token)" << Endl;
+ future = RestartRpc(runtime, schemeShardId, NACLib::TUserToken("root@builtin", {}).SerializeAsString());
+ result = runtime.WaitFuture(std::move(future));
+ UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString());
+
+ runtime.SimulateSleep(TDuration::Seconds(1));
+ InvalidateTabletResolverCache(runtime, schemeShardId);
+ auto actorAfter = ResolveTablet(runtime, schemeShardId);
+
+ UNIT_ASSERT_C(actorBefore != actorAfter, "SchemeShard actor " << actorBefore << " didn't change");
+ }
+
+} // Y_UNIT_TEST_SUITE(TabletService_Restart)
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/service_tablet.h b/ydb/core/grpc_services/tablet/service_tablet.h
new file mode 100644
index 0000000000..07b494d783
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/service_tablet.h
@@ -0,0 +1,15 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr::NGRpcService {
+
+class IRequestOpCtx;
+class IRequestNoOpCtx;
+class IFacilityProvider;
+
+void DoExecuteTabletMiniKQLRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
+void DoChangeTabletSchemaRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
+void DoRestartTabletRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f);
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/tablet/ut/ya.make b/ydb/core/grpc_services/tablet/ut/ya.make
new file mode 100644
index 0000000000..077370dd8c
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/ut/ya.make
@@ -0,0 +1,18 @@
+UNITTEST_FOR(ydb/core/grpc_services/tablet)
+
+SIZE(MEDIUM)
+
+SRCS(
+ rpc_change_schema_ut.cpp
+ rpc_execute_mkql_ut.cpp
+ rpc_restart_tablet_ut.cpp
+)
+
+PEERDIR(
+ ydb/core/testlib/default
+ ydb/core/grpc_services/local_rpc
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/grpc_services/tablet/ya.make b/ydb/core/grpc_services/tablet/ya.make
new file mode 100644
index 0000000000..7c832b6ad3
--- /dev/null
+++ b/ydb/core/grpc_services/tablet/ya.make
@@ -0,0 +1,28 @@
+LIBRARY()
+
+SRCS(
+ rpc_change_schema.cpp
+ rpc_execute_mkql.cpp
+ rpc_restart_tablet.cpp
+ service_tablet.h
+)
+
+PEERDIR(
+ ydb/core/base
+ ydb/core/grpc_services
+ ydb/core/grpc_services/base
+ ydb/core/protos
+ ydb/library/mkql_proto
+ ydb/library/yql/minikql
+ ydb/library/yql/minikql/computation
+ ydb/public/api/protos
+ library/cpp/protobuf/json
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
+
+RECURSE_FOR_TESTS(
+ ut
+)
diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make
index 63344c31d9..7b9eb6b92f 100644
--- a/ydb/core/grpc_services/ya.make
+++ b/ydb/core/grpc_services/ya.make
@@ -152,6 +152,7 @@ RECURSE(
base
counters
local_rpc
+ tablet
)
RECURSE_FOR_TESTS(
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index edfb1904ce..8efd3790b3 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -29,6 +29,7 @@
#include <ydb/services/ydb/ydb_scripting.h>
#include <ydb/services/ydb/ydb_table.h>
#include <ydb/services/ydb/ydb_logstore.h>
+#include <ydb/services/tablet/ydb_tablet.h>
#include <ydb/services/discovery/grpc_service.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/persqueue_cluster_discovery/grpc_service.h>
@@ -429,6 +430,7 @@ namespace Tests {
GRpcServer->AddService(new NGRpcService::TGRpcYmqService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true));
GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1));
+ GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1));
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0]));
GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0]));
diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make
index 5a63f36a63..4d7fab765f 100644
--- a/ydb/core/testlib/ya.make
+++ b/ydb/core/testlib/ya.make
@@ -111,6 +111,7 @@ PEERDIR(
ydb/services/replication
ydb/services/monitoring
ydb/services/metadata/ds_table
+ ydb/services/tablet
ydb/services/ydb
ydb/core/http_proxy
diff --git a/ydb/services/tablet/ya.make b/ydb/services/tablet/ya.make
new file mode 100644
index 0000000000..a525f7899d
--- /dev/null
+++ b/ydb/services/tablet/ya.make
@@ -0,0 +1,15 @@
+LIBRARY()
+
+SRCS(
+ ydb_tablet.cpp
+)
+
+PEERDIR(
+ ydb/library/grpc/server
+ ydb/public/api/grpc/draft
+ ydb/core/grpc_services
+ ydb/core/grpc_services/base
+ ydb/core/grpc_services/tablet
+)
+
+END()
diff --git a/ydb/services/tablet/ydb_tablet.cpp b/ydb/services/tablet/ydb_tablet.cpp
new file mode 100644
index 0000000000..f6e78a3c4e
--- /dev/null
+++ b/ydb/services/tablet/ydb_tablet.cpp
@@ -0,0 +1,55 @@
+#include "ydb_tablet.h"
+
+#include <ydb/core/grpc_services/tablet/service_tablet.h>
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/base/base.h>
+
+namespace NKikimr::NGRpcService {
+
+TGRpcYdbTabletService::TGRpcYdbTabletService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue)
+ : TBase(system, counters, proxies, rlAllowed)
+ , HandlersPerCompletionQueue(handlersPerCompletionQueue)
+{}
+
+void TGRpcYdbTabletService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+
+ size_t proxyCounter = 0;
+
+#ifdef ADD_REQUEST_LIMIT
+#error ADD_REQUEST_LIMIT macro already defined
+#endif
+
+#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE, ...) do { \
+ for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \
+ for (auto* cq: CQS) { \
+ auto proxy = GRpcProxies_[proxyCounter++ % GRpcProxies_.size()]; \
+ MakeIntrusive<TGRpcRequest<Ydb::Tablet::NAME##Request, Ydb::Tablet::NAME##Response, TGRpcYdbTabletService>> \
+ (this, &Service_, cq, \
+ [this, proxy](NYdbGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(proxy, \
+ new TGrpcRequestNoOperationCall<Ydb::Tablet::NAME##Request, Ydb::Tablet::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings { \
+ .RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \
+ __VA_OPT__(.AuditMode = TAuditMode::__VA_ARGS__,) \
+ })); \
+ }, &Ydb::Tablet::V1::TabletService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("tablet", #NAME))->Run(); \
+ } \
+ } \
+} while(0)
+
+ ADD_REQUEST_LIMIT(ExecuteTabletMiniKQL, DoExecuteTabletMiniKQLRequest, Rps, Auditable);
+ ADD_REQUEST_LIMIT(ChangeTabletSchema, DoChangeTabletSchemaRequest, Rps, Auditable);
+ ADD_REQUEST_LIMIT(RestartTablet, DoRestartTabletRequest, Rps);
+
+#undef ADD_REQUEST_LIMIT
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/tablet/ydb_tablet.h b/ydb/services/tablet/ydb_tablet.h
new file mode 100644
index 0000000000..638314c495
--- /dev/null
+++ b/ydb/services/tablet/ydb_tablet.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include <ydb/library/actors/core/actorsystem.h>
+#include <ydb/library/grpc/server/grpc_server.h>
+#include <ydb/core/grpc_services/base/base_service.h>
+
+#include <ydb/public/api/grpc/draft/ydb_tablet_v1.grpc.pb.h>
+
+namespace NKikimr::NGRpcService {
+
+class TGRpcYdbTabletService
+ : public TGrpcServiceBase<Ydb::Tablet::V1::TabletService>
+{
+ using TBase = TGrpcServiceBase<Ydb::Tablet::V1::TabletService>;
+
+public:
+ TGRpcYdbTabletService(
+ NActors::TActorSystem *system,
+ TIntrusivePtr<::NMonitoring::TDynamicCounters> counters,
+ const TVector<NActors::TActorId>& proxies,
+ bool rlAllowed,
+ size_t handlersPerCompletionQueue = 1);
+
+private:
+ void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger);
+
+private:
+ const size_t HandlersPerCompletionQueue;
+};
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/ya.make b/ydb/services/ya.make
index 636feb6111..fc40a9c574 100644
--- a/ydb/services/ya.make
+++ b/ydb/services/ya.make
@@ -20,6 +20,7 @@ RECURSE(
persqueue_v1
rate_limiter
replication
+ tablet
ydb
ymq
)