diff options
author | Aleksei Borzenkov <snaury@ydb.tech> | 2024-08-29 14:00:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-08-29 11:00:26 +0000 |
commit | 9c8e625b136d7f671dafd60dbf69222fdbb8eaba (patch) | |
tree | 33cf9c3c4652e6304c849d692422658f7d49760f | |
parent | 3caa09fee6dc090b1bcfbf639a0488f3d60ca281 (diff) | |
download | ydb-9c8e625b136d7f671dafd60dbf69222fdbb8eaba.tar.gz |
Implement a new tablet administration grpc service (#8400)
36 files changed, 1280 insertions, 36 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index ef569e886f..cf05af4742 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -122,6 +122,7 @@ #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> #include <ydb/services/ydb/ydb_object_storage.h> +#include <ydb/services/tablet/ydb_tablet.h> #include <ydb/core/fq/libs/init/init.h> @@ -598,6 +599,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["keyvalue"] = &hasKeyValue; TServiceCfg hasReplication = services.empty(); names["replication"] = &hasReplication; + TServiceCfg hasTabletService = services.empty(); + names["tablet_service"] = &hasTabletService; std::unordered_set<TString> enabled; for (const auto& name : services) { @@ -873,6 +876,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { grpcRequestProxies[0], hasReplication.IsRlAllowed())); } + if (hasTabletService) { + server.AddService(new NGRpcService::TGRpcYdbTabletService(ActorSystem.Get(), Counters, grpcRequestProxies, + hasTabletService.IsRlAllowed(), grpcConfig.GetHandlersPerCompletionQueue())); + } + if (ModuleFactories) { for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) { server.AddService(service); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index b83ac0465e..30a23d8f65 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -165,6 +165,7 @@ PEERDIR( ydb/services/persqueue_v1 ydb/services/rate_limiter ydb/services/replication + ydb/services/tablet ydb/services/ydb ) diff --git a/ydb/core/engine/minikql/flat_local_tx_minikql.h b/ydb/core/engine/minikql/flat_local_tx_minikql.h index 71f197789a..62686fdf1b 100644 --- a/ydb/core/engine/minikql/flat_local_tx_minikql.h +++ b/ydb/core/engine/minikql/flat_local_tx_minikql.h @@ -106,7 +106,7 @@ class TFlatLocalMiniKQL : public NTabletFlatExecutor::ITransaction { bool PrepareParams(TTransactionContext &txc, const TAppData *appData) { Y_UNUSED(txc); if (SourceProgram.Params.Binary) { - SerializedMiniKQLParams = SourceProgram.Program.Binary; + SerializedMiniKQLParams = SourceProgram.Params.Binary; return true; } diff --git a/ydb/core/grpc_services/audit_dml_operations.cpp b/ydb/core/grpc_services/audit_dml_operations.cpp index 9d428d0c6b..272d064755 100644 --- a/ydb/core/grpc_services/audit_dml_operations.cpp +++ b/ydb/core/grpc_services/audit_dml_operations.cpp @@ -3,6 +3,7 @@ #include <ydb/public/api/protos/ydb_table.pb.h> #include <ydb/public/api/protos/ydb_scripting.pb.h> #include <ydb/public/api/protos/ydb_query.pb.h> +#include <ydb/public/api/protos/draft/ydb_tablet.pb.h> #include "base/base.h" @@ -196,4 +197,24 @@ void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest& } // log updated_row_count collected from ExecuteScriptMetadata.exec_stats? +// TabletService, ExecuteTabletMiniKQL +template <> +void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request) { + if (request.dry_run()) { + return; + } + ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id()); + ctx->AddAuditLogPart("program_text", PrepareText(request.program())); +} + +// TabletService, ChangeTabletSchema +template <> +void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request) { + if (request.dry_run()) { + return; + } + ctx->AddAuditLogPart("tablet_id", TStringBuilder() << request.tablet_id()); + ctx->AddAuditLogPart("schema_changes", PrepareText(request.schema_changes())); +} + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/audit_dml_operations.h b/ydb/core/grpc_services/audit_dml_operations.h index 9c8b600d5a..f3145e708d 100644 --- a/ydb/core/grpc_services/audit_dml_operations.h +++ b/ydb/core/grpc_services/audit_dml_operations.h @@ -28,6 +28,13 @@ class ExecuteScriptRequest; } +namespace Ydb::Tablet { + +class ExecuteTabletMiniKQLRequest; +class ChangeTabletSchemaRequest; + +} + namespace NKikimr::NGRpcService { class IAuditCtx; @@ -80,4 +87,8 @@ template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteQue // ExecuteSrcipt template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Query::ExecuteScriptRequest& request); +// TabletService +template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ExecuteTabletMiniKQLRequest& request); +template <> void AuditContextAppend(IAuditCtx* ctx, const Ydb::Tablet::ChangeTabletSchemaRequest& request); + } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index e25877095c..5f729d9907 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -71,7 +71,7 @@ public: return; } - Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), DatabaseName, ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline())); + Register(NKqp::CreateGetScriptExecutionResultActor(SelfId(), GetDatabaseName(), ExecutionId, req->result_set_index(), RowsOffset, req->rows_limit(), req->rows_limit() ? 0 : MAX_SIZE_LIMIT, Request->GetDeadline())); Become(&TFetchScriptResultsRPC::StateFunc); } diff --git a/ydb/core/grpc_services/rpc_backup.cpp b/ydb/core/grpc_services/rpc_backup.cpp index 94a694d759..37760e8050 100644 --- a/ydb/core/grpc_services/rpc_backup.cpp +++ b/ydb/core/grpc_services/rpc_backup.cpp @@ -42,7 +42,7 @@ public: auto ev = MakeHolder<typename NSchemeShard::TEvBackup::TEvApiMapping<TIn>::TEv>(); ev->Record.SetTxId(this->TxId); - ev->Record.SetDatabaseName(this->DatabaseName); + ev->Record.SetDatabaseName(this->GetDatabaseName()); if (this->UserToken) { ev->Record.SetUserSID(this->UserToken->GetUserSID()); } diff --git a/ydb/core/grpc_services/rpc_cancel_operation.cpp b/ydb/core/grpc_services/rpc_cancel_operation.cpp index 885310b244..5b602ddc53 100644 --- a/ydb/core/grpc_services/rpc_cancel_operation.cpp +++ b/ydb/core/grpc_services/rpc_cancel_operation.cpp @@ -43,11 +43,11 @@ class TCancelOperationRPC: public TRpcOperationRequestActor<TCancelOperationRPC, IEventBase* MakeRequest() override { switch (OperationId.GetKind()) { case TOperationId::EXPORT: - return new TEvExport::TEvCancelExportRequest(TxId, DatabaseName, RawOperationId); + return new TEvExport::TEvCancelExportRequest(TxId, GetDatabaseName(), RawOperationId); case TOperationId::IMPORT: - return new TEvImport::TEvCancelImportRequest(TxId, DatabaseName, RawOperationId); + return new TEvImport::TEvCancelImportRequest(TxId, GetDatabaseName(), RawOperationId); case TOperationId::BUILD_INDEX: - return new TEvIndexBuilder::TEvCancelRequest(TxId, DatabaseName, RawOperationId); + return new TEvIndexBuilder::TEvCancelRequest(TxId, GetDatabaseName(), RawOperationId); default: Y_ABORT("unreachable"); } @@ -141,7 +141,7 @@ public: } void SendCancelScriptExecutionOperation() { - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(DatabaseName, OperationId)); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvCancelScriptExecutionOperation(GetDatabaseName(), OperationId)); } private: diff --git a/ydb/core/grpc_services/rpc_export.cpp b/ydb/core/grpc_services/rpc_export.cpp index 6bf1b5c713..572b9c34e4 100644 --- a/ydb/core/grpc_services/rpc_export.cpp +++ b/ydb/core/grpc_services/rpc_export.cpp @@ -37,7 +37,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, auto ev = MakeHolder<TEvExport::TEvCreateExportRequest>(); ev->Record.SetTxId(this->TxId); - ev->Record.SetDatabaseName(this->DatabaseName); + ev->Record.SetDatabaseName(this->GetDatabaseName()); if (this->UserToken) { ev->Record.SetUserSID(this->UserToken->GetUserSID()); } @@ -64,7 +64,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, TVector<TString> ExtractPaths() { TVector<TString> paths; - paths.emplace_back(this->DatabaseName); // first entry is database + paths.emplace_back(this->GetDatabaseName()); // first entry is database ExtractPaths(paths, this->GetProtoRequest()->settings()); return paths; @@ -74,7 +74,7 @@ class TExportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, Y_ABORT_UNLESS(!paths.empty()); auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->DatabaseName = this->DatabaseName; + request->DatabaseName = this->GetDatabaseName(); for (const auto& path : paths) { auto& entry = request->ResultSet.emplace_back(); diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp index 4619222b6d..2dca82f6a0 100644 --- a/ydb/core/grpc_services/rpc_forget_operation.cpp +++ b/ydb/core/grpc_services/rpc_forget_operation.cpp @@ -44,11 +44,11 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, IEventBase* MakeRequest() override { switch (OperationId.GetKind()) { case TOperationId::EXPORT: - return new TEvExport::TEvForgetExportRequest(TxId, DatabaseName, RawOperationId); + return new TEvExport::TEvForgetExportRequest(TxId, GetDatabaseName(), RawOperationId); case TOperationId::IMPORT: - return new TEvImport::TEvForgetImportRequest(TxId, DatabaseName, RawOperationId); + return new TEvImport::TEvForgetImportRequest(TxId, GetDatabaseName(), RawOperationId); case TOperationId::BUILD_INDEX: - return new TEvIndexBuilder::TEvForgetRequest(TxId, DatabaseName, RawOperationId); + return new TEvIndexBuilder::TEvForgetRequest(TxId, GetDatabaseName(), RawOperationId); default: Y_ABORT("unreachable"); } @@ -97,7 +97,7 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, } void SendForgetScriptExecutionOperation() { - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId)); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(GetDatabaseName(), OperationId)); } public: diff --git a/ydb/core/grpc_services/rpc_get_operation.cpp b/ydb/core/grpc_services/rpc_get_operation.cpp index 3fdaa8b1d2..a4a8e3b765 100644 --- a/ydb/core/grpc_services/rpc_get_operation.cpp +++ b/ydb/core/grpc_services/rpc_get_operation.cpp @@ -56,11 +56,11 @@ class TGetOperationRPC : public TRpcOperationRequestActor<TGetOperationRPC, TEvG IEventBase* MakeRequest() override { switch (OperationId_.GetKind()) { case TOperationId::EXPORT: - return new NSchemeShard::TEvExport::TEvGetExportRequest(DatabaseName, RawOperationId_); + return new NSchemeShard::TEvExport::TEvGetExportRequest(GetDatabaseName(), RawOperationId_); case TOperationId::IMPORT: - return new NSchemeShard::TEvImport::TEvGetImportRequest(DatabaseName, RawOperationId_); + return new NSchemeShard::TEvImport::TEvGetImportRequest(GetDatabaseName(), RawOperationId_); case TOperationId::BUILD_INDEX: - return new NSchemeShard::TEvIndexBuilder::TEvGetRequest(DatabaseName, RawOperationId_); + return new NSchemeShard::TEvIndexBuilder::TEvGetRequest(GetDatabaseName(), RawOperationId_); default: Y_ABORT("unreachable"); } @@ -199,7 +199,7 @@ private: } void SendGetScriptExecutionOperation() { - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvGetScriptExecutionOperation(DatabaseName, OperationId_)); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvGetScriptExecutionOperation(GetDatabaseName(), OperationId_)); } void Handle(NSchemeShard::TEvExport::TEvGetExportResponse::TPtr& ev, const TActorContext& ctx) { diff --git a/ydb/core/grpc_services/rpc_import.cpp b/ydb/core/grpc_services/rpc_import.cpp index 174e9df84c..89061b2f8b 100644 --- a/ydb/core/grpc_services/rpc_import.cpp +++ b/ydb/core/grpc_services/rpc_import.cpp @@ -35,7 +35,7 @@ class TImportRPC: public TRpcOperationRequestActor<TDerived, TEvRequest, true>, auto ev = MakeHolder<TEvImport::TEvCreateImportRequest>(); ev->Record.SetTxId(this->TxId); - ev->Record.SetDatabaseName(this->DatabaseName); + ev->Record.SetDatabaseName(this->GetDatabaseName()); if (this->UserToken) { ev->Record.SetUserSID(this->UserToken->GetUserSID()); } diff --git a/ydb/core/grpc_services/rpc_import_data.cpp b/ydb/core/grpc_services/rpc_import_data.cpp index 250421386d..75565ebcb3 100644 --- a/ydb/core/grpc_services/rpc_import_data.cpp +++ b/ydb/core/grpc_services/rpc_import_data.cpp @@ -120,7 +120,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque void ResolvePath() { auto request = MakeHolder<TNavigate>(); - request->DatabaseName = NKikimr::CanonizePath(DatabaseName); + request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName()); auto& entry = request->ResultSet.emplace_back(); entry.Operation = TNavigate::OpTable; @@ -179,7 +179,7 @@ class TImportDataRPC: public TRpcRequestActor<TImportDataRPC, TEvImportDataReque void ResolveKeys() { auto request = MakeHolder<TResolve>(); - request->DatabaseName = NKikimr::CanonizePath(DatabaseName); + request->DatabaseName = NKikimr::CanonizePath(GetDatabaseName()); request->ResultSet.emplace_back(std::move(KeyDesc)); request->ResultSet.back().Access = NACLib::UpdateRow; diff --git a/ydb/core/grpc_services/rpc_list_operations.cpp b/ydb/core/grpc_services/rpc_list_operations.cpp index 20742675b5..47805e7b56 100644 --- a/ydb/core/grpc_services/rpc_list_operations.cpp +++ b/ydb/core/grpc_services/rpc_list_operations.cpp @@ -54,13 +54,13 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T switch (ParseKind(GetProtoRequest()->kind())) { case TOperationId::SS_BG_TASKS: - return new NSchemeShard::NBackground::TEvListRequest(DatabaseName, request.page_size(), request.page_token()); + return new NSchemeShard::NBackground::TEvListRequest(GetDatabaseName(), request.page_size(), request.page_token()); case TOperationId::EXPORT: - return new TEvExport::TEvListExportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind()); + return new TEvExport::TEvListExportsRequest(GetDatabaseName(), request.page_size(), request.page_token(), request.kind()); case TOperationId::IMPORT: - return new TEvImport::TEvListImportsRequest(DatabaseName, request.page_size(), request.page_token(), request.kind()); + return new TEvImport::TEvListImportsRequest(GetDatabaseName(), request.page_size(), request.page_token(), request.kind()); case TOperationId::BUILD_INDEX: - return new TEvIndexBuilder::TEvListRequest(DatabaseName, request.page_size(), request.page_token()); + return new TEvIndexBuilder::TEvListRequest(GetDatabaseName(), request.page_size(), request.page_token()); default: Y_ABORT("unreachable"); } @@ -141,7 +141,7 @@ class TListOperationsRPC: public TRpcOperationRequestActor<TListOperationsRPC, T } void SendListScriptExecutions() { - Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(DatabaseName, GetProtoRequest()->page_size(), GetProtoRequest()->page_token())); + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvListScriptExecutionOperations(GetDatabaseName(), GetProtoRequest()->page_size(), GetProtoRequest()->page_token())); } void Handle(NKqp::TEvListScriptExecutionOperationsResponse::TPtr& ev) { diff --git a/ydb/core/grpc_services/rpc_login.cpp b/ydb/core/grpc_services/rpc_login.cpp index f259181d2e..b80e713d74 100644 --- a/ydb/core/grpc_services/rpc_login.cpp +++ b/ydb/core/grpc_services/rpc_login.cpp @@ -43,7 +43,7 @@ public: const Ydb::Auth::LoginRequest* protoRequest = GetProtoRequest(); Credentials = PrepareCredentials(protoRequest->user(), protoRequest->password(), AppData()->AuthConfig); TString domainName = "/" + AppData()->DomainsInfo->GetDomain()->Name; - PathToDatabase = AppData()->AuthConfig.GetDomainLoginOnly() ? domainName : DatabaseName; + PathToDatabase = AppData()->AuthConfig.GetDomainLoginOnly() ? domainName : GetDatabaseName(); auto sendParameters = GetSendParameters(Credentials, PathToDatabase); Send(sendParameters.Recipient, sendParameters.Event.Release()); Become(&TThis::StateWork, Timeout, new TEvents::TEvWakeup()); diff --git a/ydb/core/grpc_services/rpc_operation_request_base.h b/ydb/core/grpc_services/rpc_operation_request_base.h index cf8cca65c1..14548d11df 100644 --- a/ydb/core/grpc_services/rpc_operation_request_base.h +++ b/ydb/core/grpc_services/rpc_operation_request_base.h @@ -47,14 +47,14 @@ protected: void ResolveDatabase() { LOG_D("Resolve database" - << ": name# " << this->DatabaseName); + << ": name# " << this->GetDatabaseName()); auto request = MakeHolder<NSchemeCache::TSchemeCacheNavigate>(); - request->DatabaseName = this->DatabaseName; + request->DatabaseName = this->GetDatabaseName(); auto& entry = request->ResultSet.emplace_back(); entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; - entry.Path = NKikimr::SplitPath(this->DatabaseName); + entry.Path = NKikimr::SplitPath(this->GetDatabaseName()); this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request.Release())); } diff --git a/ydb/core/grpc_services/rpc_request_base.h b/ydb/core/grpc_services/rpc_request_base.h index 3a8ff0f578..e1ec39ace2 100644 --- a/ydb/core/grpc_services/rpc_request_base.h +++ b/ydb/core/grpc_services/rpc_request_base.h @@ -128,10 +128,16 @@ public: explicit TRpcRequestActor(TRequestCtx* ev) : Request(ev) - , DatabaseName(Request->GetDatabaseName().GetOrElse(DatabaseFromDomain(AppData()))) + , UserToken(CreateUserToken(Request.Get())) { - if (const auto& userToken = Request->GetSerializedToken()) { - UserToken = MakeHolder<NACLib::TUserToken>(userToken); + } + +private: + static THolder<const NACLib::TUserToken> CreateUserToken(TRequestCtx* request) { + if (const auto& userToken = request->GetSerializedToken()) { + return MakeHolder<NACLib::TUserToken>(userToken); + } else { + return {}; } } @@ -164,9 +170,24 @@ private: } protected: - THolder<TRequestCtx> Request; - const TString DatabaseName; - THolder<const NACLib::TUserToken> UserToken; + const TString& GetDatabaseName() const { + if (!DatabaseName_.has_value()) { + auto name = Request->GetDatabaseName(); + if (name) { + DatabaseName_.emplace(std::move(*name)); + } else { + DatabaseName_.emplace(DatabaseFromDomain(AppData())); + } + } + return *DatabaseName_; + } + +protected: + const THolder<TRequestCtx> Request; + const THolder<const NACLib::TUserToken> UserToken; + +private: + mutable std::optional<TString> DatabaseName_; }; // TRpcRequestActor diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema.cpp b/ydb/core/grpc_services/tablet/rpc_change_schema.cpp new file mode 100644 index 0000000000..72a2f5af84 --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_change_schema.cpp @@ -0,0 +1,160 @@ +#include "rpc_change_schema.h" +#include "service_tablet.h" + +#include <ydb/core/grpc_services/rpc_request_base.h> +#include <ydb/core/grpc_services/audit_dml_operations.h> +#include <ydb/core/base/tablet.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/protos/scheme_log.pb.h> + +#include <library/cpp/protobuf/json/json2proto.h> +#include <library/cpp/protobuf/json/proto2json.h> + +namespace NKikimr::NGRpcService { + +class TRpcChangeTabletSchema : public TRpcRequestActor<TRpcChangeTabletSchema, TEvChangeTabletSchemaRequest> { + using TBase = TRpcRequestActor<TRpcChangeTabletSchema, TEvChangeTabletSchemaRequest>; + +public: + using TBase::TBase; + + void Bootstrap() { + if (!CheckAccess()) { + auto error = TStringBuilder() << "Access denied"; + if (this->UserToken) { + error << ": '" << this->UserToken->GetUserSID() << "' is not an admin"; + } + + this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error); + return; + } + + auto* req = this->GetProtoRequest(); + AuditContextAppend(Request.Get(), *req); + + try { + TabletId = req->tablet_id(); + TabletReq = std::make_unique<TEvTablet::TEvLocalSchemeTx>(); + if (const auto& changes = req->schema_changes(); !changes.empty()) { + NProtobufJson::Json2Proto(changes, *TabletReq->Record.MutableSchemeChanges(), { + .FieldNameMode = NProtobufJson::TJson2ProtoConfig::FieldNameSnakeCaseDense, + .AllowUnknownFields = false, + .MapAsObject = true, + .EnumValueMode = NProtobufJson::TJson2ProtoConfig::EnumSnakeCaseInsensitive, + }); + } + TabletReq->Record.SetDryRun(req->dry_run()); + } catch (const std::exception& e) { + this->Reply(Ydb::StatusIds::BAD_REQUEST, e.what()); + return; + } + + PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{ + // We need at least one retry since local resolver cache may be outdated + .RetryLimitCount = 1, + })); + + Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup); + + Become(&TThis::StateWork); + } + +private: + bool CheckAccess() const { + if (AppData()->AdministrationAllowedSIDs.empty()) { + return true; + } + + if (!this->UserToken) { + return false; + } + + for (const auto& sid : AppData()->AdministrationAllowedSIDs) { + if (this->UserToken->IsExist(sid)) { + return true; + } + } + + return false; + } + +private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvTablet::TEvLocalSchemeTxResponse, Handle); + hFunc(TEvents::TEvWakeup, Handle); + } + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + auto* msg = ev->Get(); + if (msg->Status != NKikimrProto::OK) { + this->Reply(Ydb::StatusIds::UNAVAILABLE, + TStringBuilder() << "Tablet " << TabletId << " is unavailable"); + return; + } + + NTabletPipe::SendData(SelfId(), PipeClient, TabletReq.release()); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + this->Reply(Ydb::StatusIds::UNDETERMINED, + TStringBuilder() << "Tablet " << TabletId << " disconnected"); + } + + void Handle(TEvTablet::TEvLocalSchemeTxResponse::TPtr& ev) { + NTabletPipe::CloseClient(SelfId(), PipeClient); + + auto* msg = ev->Get(); + if (msg->Record.GetStatus() != NKikimrProto::OK) { + this->Reply(Ydb::StatusIds::GENERIC_ERROR, + msg->Record.HasErrorReason() ? msg->Record.GetErrorReason() : "Unknown error"); + return; + } + + auto* response = google::protobuf::Arena::CreateMessage<Ydb::Tablet::ChangeTabletSchemaResponse>(Request->GetArena()); + response->set_status(Ydb::StatusIds::SUCCESS); + if (msg->Record.HasFullScheme()) { + try { + TString text; + NProtobufJson::Proto2Json(msg->Record.GetFullScheme(), text, { + .EnumMode = NProtobufJson::TProto2JsonConfig::EnumName, + .FieldNameMode = NProtobufJson::TProto2JsonConfig::FieldNameSnakeCaseDense, + .MapAsObject = true, + }); + response->set_schema(std::move(text)); + } catch (const std::exception& e) { + response->set_status(Ydb::StatusIds::GENERIC_ERROR); + auto* issue = response->add_issues(); + issue->set_severity(NYql::TSeverityIds::S_ERROR); + issue->set_message(e.what()); + } + } + Request->Reply(response, response->status()); + PassAway(); + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + NTabletPipe::CloseClient(SelfId(), PipeClient); + this->Reply(Ydb::StatusIds::TIMEOUT, + TStringBuilder() << "Tablet " << TabletId << " is not responding"); + } + +private: + ui64 TabletId; + std::unique_ptr<TEvTablet::TEvLocalSchemeTx> TabletReq; + TActorId PipeClient; +}; + +void DoChangeTabletSchemaRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + f.RegisterActor(new TRpcChangeTabletSchema(p.release())); +} + +template<> +IActor* TEvChangeTabletSchemaRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) { + return new TRpcChangeTabletSchema(msg); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema.h b/ydb/core/grpc_services/tablet/rpc_change_schema.h new file mode 100644 index 0000000000..59566e28fc --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_change_schema.h @@ -0,0 +1,11 @@ +#pragma once +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/draft/ydb_tablet.pb.h> + +namespace NKikimr::NGRpcService { + +using TEvChangeTabletSchemaRequest = TGrpcRequestNoOperationCall< + Ydb::Tablet::ChangeTabletSchemaRequest, + Ydb::Tablet::ChangeTabletSchemaResponse>; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp b/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp new file mode 100644 index 0000000000..1228ab5a3f --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_change_schema_ut.cpp @@ -0,0 +1,130 @@ +#include "rpc_change_schema.h" +#include <ydb/core/testlib/test_client.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NGRpcService { + +using namespace Tests; + +Y_UNIT_TEST_SUITE(TabletService_ChangeSchema) { + + NThreading::TFuture<Ydb::Tablet::ChangeTabletSchemaResponse> ChangeSchema( + TTestActorRuntime& runtime, ui64 tabletId, const TString& changes, + const TString& token = {}, + bool dryRun = false) + { + // Cerr << "ChangeSchema: <<<" << changes << ">>>" << Endl; + Ydb::Tablet::ChangeTabletSchemaRequest request; + request.set_tablet_id(tabletId); + request.set_schema_changes(changes); + request.set_dry_run(dryRun); + return NRpcService::DoLocalRpc<TEvChangeTabletSchemaRequest>( + std::move(request), "/Root", token, runtime.GetActorSystem(0)); + } + + TString MakeSchemaChange() { + return R"__( + {"delta": [ + {"delta_type": "AddTable", + "table_id": 5555, + "table_name": "MyAwesomeTable"}, + {"delta_type": "AddColumn", + "table_id": 5555, + "column_id": 1, + "column_name": "MyAwesomeKey", + "column_type": 4}, + {"delta_type": "AddColumnToKey", + "table_id": 5555, + "column_id": 1} + ]} + )__"; + } + + Y_UNIT_TEST(Basics) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + + Cerr << "... reading schema" << Endl; + auto future = ChangeSchema(runtime, schemeShardId, ""); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_C(result.schema().StartsWith(R"__({"delta":[{"delta_type":"AddTable","table_id":1,"table_name":"Paths"},)__"), result.schema()); + + Cerr << "... changing schema (dry run)" << Endl; + future = ChangeSchema(runtime, schemeShardId, MakeSchemaChange(), {}, /* dryRun */ true); + result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema()); + + Cerr << "... reading schema" << Endl; + future = ChangeSchema(runtime, schemeShardId, ""); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_C(!result.schema().Contains("MyAwesomeTable"), result.schema()); + + Cerr << "... changing schema" << Endl; + future = ChangeSchema(runtime, schemeShardId, MakeSchemaChange()); + result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema()); + + Cerr << "... reading schema" << Endl; + future = ChangeSchema(runtime, schemeShardId, ""); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_C(result.schema().Contains(R"__({"delta_type":"AddTable","table_id":5555,"table_name":"MyAwesomeTable"})__"), result.schema()); + } + + Y_UNIT_TEST(OnlyAdminsAllowed) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin"); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + + Cerr << "... reading schema (without token)" << Endl; + auto future = ChangeSchema(runtime, schemeShardId, ""); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + Cerr << "... reading schema (non-admin token)" << Endl; + future = ChangeSchema(runtime, schemeShardId, "", NACLib::TUserToken("user@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + Cerr << "... reading schema (admin token)" << Endl; + future = ChangeSchema(runtime, schemeShardId, "", NACLib::TUserToken("root@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + } + +} // Y_UNIT_TEST_SUITE(TabletService_ChangeSchema) + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp b/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp new file mode 100644 index 0000000000..921730f520 --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql.cpp @@ -0,0 +1,208 @@ +#include "rpc_execute_mkql.h" +#include "service_tablet.h" + +#include <ydb/core/grpc_services/rpc_request_base.h> +#include <ydb/core/grpc_services/audit_dml_operations.h> +#include <ydb/core/base/tablet.h> +#include <ydb/core/base/tablet_pipe.h> +#include <ydb/core/protos/tx_proxy.pb.h> + +#include <ydb/library/mkql_proto/mkql_proto.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_alloc.h> +#include <ydb/library/yql/minikql/mkql_mem_info.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/minikql/mkql_node_serialization.h> + +namespace NKikimr::NGRpcService { + + +class TRpcExecuteTabletMiniKQL : public TRpcRequestActor<TRpcExecuteTabletMiniKQL, TEvExecuteTabletMiniKQLRequest> { + using TBase = TRpcRequestActor<TRpcExecuteTabletMiniKQL, TEvExecuteTabletMiniKQLRequest>; + +public: + using TBase::TBase; + + void Bootstrap() { + if (!CheckAccess()) { + auto error = TStringBuilder() << "Access denied"; + if (this->UserToken) { + error << ": '" << this->UserToken->GetUserSID() << "' is not an admin"; + } + + this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error); + return; + } + + auto* req = this->GetProtoRequest(); + AuditContextAppend(Request.Get(), *req); + + try { + TabletId = req->tablet_id(); + TabletReq = std::make_unique<TEvTablet::TEvLocalMKQL>(); + auto* tx = TabletReq->Record.MutableProgram(); + tx->MutableProgram()->SetText(req->program()); + if (const auto& params = req->parameters(); !params.empty()) { + auto* functionRegistry = AppData()->FunctionRegistry; + NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators()); + NMiniKQL::TTypeEnvironment env(alloc); + NMiniKQL::TMemoryUsageInfo memInfo("TRpcExecuteTabletMiniKQL"); + NMiniKQL::THolderFactory factory(alloc.Ref(), memInfo, functionRegistry); + // NKikimrMiniKQL.TParams + auto* protoParams = tx->MutableParams()->MutableProto(); + protoParams->MutableType()->SetKind(NKikimrMiniKQL::ETypeKind::Struct); + auto* protoStructType = protoParams->MutableType()->MutableStruct(); + auto* protoValue = protoParams->MutableValue(); + for (const auto& pr : params) { + auto* protoMember = protoStructType->AddMember(); + protoMember->SetName(pr.first); + auto [pType, value] = NMiniKQL::ImportValueFromProto(pr.second.type(), pr.second.value(), env, factory); + ExportTypeToProto(pType, *protoMember->MutableType()); + ExportValueToProto(pType, value, *protoValue->AddStruct()); + } + // Tablet needs a serialized runtime node for parameters + auto node = NMiniKQL::ImportValueFromProto(*protoParams, env); + tx->MutableParams()->SetBin(NMiniKQL::SerializeRuntimeNode(node, env)); + // We no longer need the protobuf parameters + tx->MutableParams()->ClearProto(); + } + + if (req->dry_run()) { + tx->SetMode(NKikimrTxUserProxy::TMiniKQLTransaction::COMPILE); + } + } catch (const std::exception& e) { + this->Reply(Ydb::StatusIds::BAD_REQUEST, e.what()); + return; + } + + PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{ + // We need at least one retry since local resolver cache may be outdated + .RetryLimitCount = 1, + })); + + Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup); + + Become(&TThis::StateWork); + } + +private: + bool CheckAccess() const { + if (AppData()->AdministrationAllowedSIDs.empty()) { + return true; + } + + if (!this->UserToken) { + return false; + } + + for (const auto& sid : AppData()->AdministrationAllowedSIDs) { + if (this->UserToken->IsExist(sid)) { + return true; + } + } + + return false; + } + +private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvTablet::TEvLocalMKQLResponse, Handle); + hFunc(TEvents::TEvWakeup, Handle); + } + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + auto* msg = ev->Get(); + if (msg->Status != NKikimrProto::OK) { + this->Reply(Ydb::StatusIds::UNAVAILABLE, + TStringBuilder() << "Tablet " << TabletId << " is unavailable"); + return; + } + + NTabletPipe::SendData(SelfId(), PipeClient, TabletReq.release()); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + this->Reply(Ydb::StatusIds::UNDETERMINED, + TStringBuilder() << "Tablet " << TabletId << " disconnected"); + } + + void Handle(TEvTablet::TEvLocalMKQLResponse::TPtr& ev) { + NTabletPipe::CloseClient(SelfId(), PipeClient); + + auto* msg = ev->Get(); + auto* response = google::protobuf::Arena::CreateMessage<Ydb::Tablet::ExecuteTabletMiniKQLResponse>(Request->GetArena()); + + if (msg->Record.HasCompileResults()) { + for (const auto& issue : msg->Record.GetCompileResults().GetProgramCompileErrors()) { + *response->add_issues() = issue; + } + for (const auto& issue : msg->Record.GetCompileResults().GetParamsCompileErrors()) { + *response->add_issues() = issue; + } + } + + if (const TString& errors = msg->Record.GetMiniKQLErrors(); !errors.empty()) { + auto* issue = response->add_issues(); + issue->set_severity(NYql::TSeverityIds::S_ERROR); + issue->set_message(errors); + } + + if (msg->Record.GetStatus() != NKikimrProto::OK) { + response->set_status(Ydb::StatusIds::GENERIC_ERROR); + Request->Reply(response, response->status()); + return PassAway(); + } + + response->set_status(Ydb::StatusIds::SUCCESS); + + if (msg->Record.HasExecutionEngineEvaluatedResponse()) { + try { + const auto& protoResult = msg->Record.GetExecutionEngineEvaluatedResponse(); + auto* functionRegistry = AppData()->FunctionRegistry; + NMiniKQL::TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), functionRegistry->SupportsSizedAllocators()); + NMiniKQL::TTypeEnvironment env(alloc); + NMiniKQL::TMemoryUsageInfo memInfo("TRpcExecuteTabletMiniKQL"); + NMiniKQL::THolderFactory factory(alloc.Ref(), memInfo, functionRegistry); + auto [pType, value] = NMiniKQL::ImportValueFromProto(protoResult.GetType(), protoResult.GetValue(), env, factory); + ExportTypeToProto(pType, *response->mutable_result()->mutable_type()); + ExportValueToProto(pType, value, *response->mutable_result()->mutable_value()); + } catch (const std::exception& e) { + response->set_status(Ydb::StatusIds::GENERIC_ERROR); + auto* issue = response->add_issues(); + issue->set_severity(NYql::TSeverityIds::S_ERROR); + issue->set_message(e.what()); + Request->Reply(response, response->status()); + return PassAway(); + } + } + + Request->Reply(response, response->status()); + return PassAway(); + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + NTabletPipe::CloseClient(SelfId(), PipeClient); + this->Reply(Ydb::StatusIds::TIMEOUT, + TStringBuilder() << "Tablet " << TabletId << " is not responding"); + } + +private: + ui64 TabletId; + std::unique_ptr<TEvTablet::TEvLocalMKQL> TabletReq; + TActorId PipeClient; +}; + +void DoExecuteTabletMiniKQLRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + f.RegisterActor(new TRpcExecuteTabletMiniKQL(p.release())); +} + +template<> +IActor* TEvExecuteTabletMiniKQLRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) { + return new TRpcExecuteTabletMiniKQL(msg); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql.h b/ydb/core/grpc_services/tablet/rpc_execute_mkql.h new file mode 100644 index 0000000000..45c5f4421d --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql.h @@ -0,0 +1,11 @@ +#pragma once +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/draft/ydb_tablet.pb.h> + +namespace NKikimr::NGRpcService { + +using TEvExecuteTabletMiniKQLRequest = TGrpcRequestNoOperationCall< + Ydb::Tablet::ExecuteTabletMiniKQLRequest, + Ydb::Tablet::ExecuteTabletMiniKQLResponse>; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp b/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp new file mode 100644 index 0000000000..9eb251ec44 --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_execute_mkql_ut.cpp @@ -0,0 +1,285 @@ +#include "rpc_execute_mkql.h" +#include <ydb/core/testlib/test_client.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NGRpcService { + +using namespace Tests; + +Y_UNIT_TEST_SUITE(TabletService_ExecuteMiniKQL) { + + Ydb::TypedValue MakeUint64(ui64 value) { + Ydb::TypedValue ret; + ret.mutable_type()->set_type_id(Ydb::Type::UINT64); + ret.mutable_value()->set_uint64_value(value); + return ret; + } + + NThreading::TFuture<Ydb::Tablet::ExecuteTabletMiniKQLResponse> ExecuteMiniKQL( + TTestActorRuntime& runtime, ui64 tabletId, const TString& program, + const std::unordered_map<TString, Ydb::TypedValue>& params = {}, + const TString& token = {}, + bool dryRun = false) + { + // Cerr << "ExecuteMiniKQL: <<<" << program << ">>>" << Endl; + Ydb::Tablet::ExecuteTabletMiniKQLRequest request; + request.set_tablet_id(tabletId); + request.set_program(program); + for (const auto& pr : params) { + (*request.mutable_parameters())[pr.first] = pr.second; + } + request.set_dry_run(dryRun); + return NRpcService::DoLocalRpc<TEvExecuteTabletMiniKQLRequest>( + std::move(request), "/Root", token, runtime.GetActorSystem(0)); + } + + Y_UNIT_TEST(BasicMiniKQLRead) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___"); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.result().value().ShortDebugString(), + "items { items { uint64_value: 1 } items { text_value: \"Root\" } }", + result.DebugString()); + } + + Y_UNIT_TEST(ParamsMiniKQLRead) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let p (Parameter 'p (DataType 'Uint64))) + (let key '('('Id p))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'p p) + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___", {{"p", MakeUint64(1)}}); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.result().value().ShortDebugString(), + "items { uint64_value: 1 } items { items { uint64_value: 1 } items { text_value: \"Root\" } }", + result.DebugString()); + } + + Ydb::TypedValue MakeMalformedValue() { + Ydb::TypedValue ret; + // Type is a struct with 2 members + Ydb::StructType* s = ret.mutable_type()->mutable_struct_type(); + Ydb::StructMember* m1 = s->add_members(); + m1->set_name("m1"); + m1->mutable_type()->set_type_id(Ydb::Type::UINT64); + Ydb::StructMember* m2 = s->add_members(); + m2->set_name("m2"); + m2->mutable_type()->set_type_id(Ydb::Type::UINT64); + // Value has only one member: malformed + ret.mutable_value()->add_items()->set_uint64_value(42); + return ret; + } + + Y_UNIT_TEST(MalformedParams) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let p (Parameter 'p (DataType 'Uint64))) + (let key '('('Id p))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'p p) + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___", {{"p", MakeMalformedValue()}}); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::BAD_REQUEST, result.DebugString()); + } + + Y_UNIT_TEST(MalformedProgram) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'NoSuchTable key select)) + )) + ))___"); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::GENERIC_ERROR, result.DebugString()); + } + + Y_UNIT_TEST(DryRunEraseRow) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (return (AsList + (EraseRow 'Paths key) + )) + ))___", {}, {}, /* dryRun */ true); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result (dry run EraseRow):\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + + future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___"); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result (SelectRow):\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.result().value().ShortDebugString(), + "items { items { uint64_value: 1 } items { text_value: \"Root\" } }", + result.DebugString()); + + // Repeat request without dry_run + future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (return (AsList + (EraseRow 'Paths key) + )) + ))___"); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result (EraseRow):\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + + future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___"); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result (SelectRow):\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.result().value().ShortDebugString(), + "items { nested_value { null_flag_value: NULL_VALUE } }", + result.DebugString()); + } + + Y_UNIT_TEST(OnlyAdminsAllowed) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin"); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___"); + auto result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___", {}, NACLib::TUserToken("user@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + future = ExecuteMiniKQL(runtime, schemeShardId, R"___(( + (let key '('('Id (Uint64 '1)))) + (let select '('Id 'Name)) + (return (AsList + (SetResult 'row (SelectRow 'Paths key select)) + )) + ))___", {}, NACLib::TUserToken("root@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + // Cerr << "Got result:\n" << result.DebugString(); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + UNIT_ASSERT_VALUES_EQUAL_C( + result.result().value().ShortDebugString(), + "items { items { uint64_value: 1 } items { text_value: \"Root\" } }", + result.DebugString()); + } + +} // Y_UNIT_TEST_SUITE(TabletService_ExecuteMiniKQL) + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp b/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp new file mode 100644 index 0000000000..3b1d91d252 --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet.cpp @@ -0,0 +1,102 @@ +#include "rpc_restart_tablet.h" +#include "service_tablet.h" + +#include <ydb/core/grpc_services/rpc_request_base.h> +#include <ydb/core/base/tablet_pipe.h> + +namespace NKikimr::NGRpcService { + +class TRpcRestartTablet : public TRpcRequestActor<TRpcRestartTablet, TEvRestartTabletRequest> { + using TBase = TRpcRequestActor<TRpcRestartTablet, TEvRestartTabletRequest>; + +public: + using TBase::TBase; + + void Bootstrap() { + if (!CheckAccess()) { + auto error = TStringBuilder() << "Access denied"; + if (this->UserToken) { + error << ": '" << this->UserToken->GetUserSID() << "' is not an admin"; + } + + this->Reply(Ydb::StatusIds::UNAUTHORIZED, NKikimrIssues::TIssuesIds::ACCESS_DENIED, error); + return; + } + + auto* req = this->GetProtoRequest(); + TabletId = req->tablet_id(); + PipeClient = RegisterWithSameMailbox(NTabletPipe::CreateClient(SelfId(), TabletId, NTabletPipe::TClientRetryPolicy{ + // We need at least one retry since local resolver cache may be outdated + .RetryLimitCount = 1, + })); + + Schedule(TDuration::Seconds(60), new TEvents::TEvWakeup); + + Become(&TThis::StateWork); + } + +private: + bool CheckAccess() const { + if (AppData()->AdministrationAllowedSIDs.empty()) { + return true; + } + + if (!this->UserToken) { + return false; + } + + for (const auto& sid : AppData()->AdministrationAllowedSIDs) { + if (this->UserToken->IsExist(sid)) { + return true; + } + } + + return false; + } + +private: + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + hFunc(TEvents::TEvWakeup, Handle); + } + } + + void Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev) { + auto* msg = ev->Get(); + if (msg->Status != NKikimrProto::OK) { + this->Reply(Ydb::StatusIds::UNAVAILABLE, + TStringBuilder() << "Tablet " << TabletId << " is unavailable"); + return; + } + + // Note: we send the poison message and wait for the pipe to close + NTabletPipe::SendData(SelfId(), PipeClient, new TEvents::TEvPoison); + } + + void Handle(TEvTabletPipe::TEvClientDestroyed::TPtr&) { + this->Reply(Ydb::StatusIds::SUCCESS); + } + + void Handle(TEvents::TEvWakeup::TPtr&) { + NTabletPipe::CloseClient(SelfId(), PipeClient); + this->Reply(Ydb::StatusIds::TIMEOUT, + TStringBuilder() << "Tablet " << TabletId << " is not responding"); + } + +private: + ui64 TabletId; + TActorId PipeClient; +}; + +void DoRestartTabletRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { + f.RegisterActor(new TRpcRestartTablet(p.release())); +} + +template<> +IActor* TEvRestartTabletRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestNoOpCtx* msg) { + return new TRpcRestartTablet(msg); +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet.h b/ydb/core/grpc_services/tablet/rpc_restart_tablet.h new file mode 100644 index 0000000000..f4615e9cee --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet.h @@ -0,0 +1,11 @@ +#pragma once +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/public/api/protos/draft/ydb_tablet.pb.h> + +namespace NKikimr::NGRpcService { + +using TEvRestartTabletRequest = TGrpcRequestNoOperationCall< + Ydb::Tablet::RestartTabletRequest, + Ydb::Tablet::RestartTabletResponse>; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp b/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp new file mode 100644 index 0000000000..4b585fcf7d --- /dev/null +++ b/ydb/core/grpc_services/tablet/rpc_restart_tablet_ut.cpp @@ -0,0 +1,97 @@ +#include "rpc_restart_tablet.h" +#include <ydb/core/testlib/test_client.h> +#include <ydb/core/testlib/tablet_helpers.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> + +#include <library/cpp/testing/unittest/registar.h> + +namespace NKikimr::NGRpcService { + +using namespace Tests; + +Y_UNIT_TEST_SUITE(TabletService_Restart) { + + NThreading::TFuture<Ydb::Tablet::RestartTabletResponse> RestartRpc( + TTestActorRuntime& runtime, ui64 tabletId, + const TString& token = {}) + { + Ydb::Tablet::RestartTabletRequest request; + request.set_tablet_id(tabletId); + return NRpcService::DoLocalRpc<TEvRestartTabletRequest>( + std::move(request), "/Root", token, runtime.GetActorSystem(0)); + } + + Y_UNIT_TEST(Basics) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto actorBefore = ResolveTablet(runtime, schemeShardId); + + Cerr << "... restarting tablet " << schemeShardId << Endl; + auto future = RestartRpc(runtime, schemeShardId); + auto result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + + runtime.SimulateSleep(TDuration::Seconds(1)); + InvalidateTabletResolverCache(runtime, schemeShardId); + auto actorAfter = ResolveTablet(runtime, schemeShardId); + + UNIT_ASSERT_C(actorBefore != actorAfter, "SchemeShard actor " << actorBefore << " didn't change"); + } + + Y_UNIT_TEST(OnlyAdminsAllowed) { + TPortManager pm; + TServerSettings serverSettings(pm.GetPort(2134)); + serverSettings.SetDomainName("Root") + .SetUseRealThreads(false); + + Tests::TServer::TPtr server = new TServer(serverSettings); + auto& runtime = *server->GetRuntime(); + runtime.GetAppData().AdministrationAllowedSIDs.push_back("root@builtin"); + + auto sender = runtime.AllocateEdgeActor(); + server->SetupRootStoragePools(sender); + + ui64 schemeShardId = ChangeStateStorage(Tests::SchemeRoot, server->GetSettings().Domain); + auto actorBefore = ResolveTablet(runtime, schemeShardId); + + Cerr << "... restarting tablet " << schemeShardId << " (without token)" << Endl; + auto future = RestartRpc(runtime, schemeShardId); + auto result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + Cerr << "... restarting tablet " << schemeShardId << " (non-admin token)" << Endl; + future = RestartRpc(runtime, schemeShardId, NACLib::TUserToken("user@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::UNAUTHORIZED, result.DebugString()); + + runtime.SimulateSleep(TDuration::Seconds(1)); + InvalidateTabletResolverCache(runtime, schemeShardId); + auto actorNoRestart = ResolveTablet(runtime, schemeShardId); + + UNIT_ASSERT_C(actorBefore == actorNoRestart, "SchemeShard actor " << actorBefore << " changed to " << actorNoRestart); + + Cerr << "... restarting tablet " << schemeShardId << " (admin token)" << Endl; + future = RestartRpc(runtime, schemeShardId, NACLib::TUserToken("root@builtin", {}).SerializeAsString()); + result = runtime.WaitFuture(std::move(future)); + UNIT_ASSERT_VALUES_EQUAL_C(result.status(), Ydb::StatusIds::SUCCESS, result.DebugString()); + + runtime.SimulateSleep(TDuration::Seconds(1)); + InvalidateTabletResolverCache(runtime, schemeShardId); + auto actorAfter = ResolveTablet(runtime, schemeShardId); + + UNIT_ASSERT_C(actorBefore != actorAfter, "SchemeShard actor " << actorBefore << " didn't change"); + } + +} // Y_UNIT_TEST_SUITE(TabletService_Restart) + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/service_tablet.h b/ydb/core/grpc_services/tablet/service_tablet.h new file mode 100644 index 0000000000..07b494d783 --- /dev/null +++ b/ydb/core/grpc_services/tablet/service_tablet.h @@ -0,0 +1,15 @@ +#pragma once + +#include <memory> + +namespace NKikimr::NGRpcService { + +class IRequestOpCtx; +class IRequestNoOpCtx; +class IFacilityProvider; + +void DoExecuteTabletMiniKQLRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); +void DoChangeTabletSchemaRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); +void DoRestartTabletRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f); + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/tablet/ut/ya.make b/ydb/core/grpc_services/tablet/ut/ya.make new file mode 100644 index 0000000000..077370dd8c --- /dev/null +++ b/ydb/core/grpc_services/tablet/ut/ya.make @@ -0,0 +1,18 @@ +UNITTEST_FOR(ydb/core/grpc_services/tablet) + +SIZE(MEDIUM) + +SRCS( + rpc_change_schema_ut.cpp + rpc_execute_mkql_ut.cpp + rpc_restart_tablet_ut.cpp +) + +PEERDIR( + ydb/core/testlib/default + ydb/core/grpc_services/local_rpc +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/grpc_services/tablet/ya.make b/ydb/core/grpc_services/tablet/ya.make new file mode 100644 index 0000000000..7c832b6ad3 --- /dev/null +++ b/ydb/core/grpc_services/tablet/ya.make @@ -0,0 +1,28 @@ +LIBRARY() + +SRCS( + rpc_change_schema.cpp + rpc_execute_mkql.cpp + rpc_restart_tablet.cpp + service_tablet.h +) + +PEERDIR( + ydb/core/base + ydb/core/grpc_services + ydb/core/grpc_services/base + ydb/core/protos + ydb/library/mkql_proto + ydb/library/yql/minikql + ydb/library/yql/minikql/computation + ydb/public/api/protos + library/cpp/protobuf/json +) + +YQL_LAST_ABI_VERSION() + +END() + +RECURSE_FOR_TESTS( + ut +) diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 63344c31d9..7b9eb6b92f 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -152,6 +152,7 @@ RECURSE( base counters local_rpc + tablet ) RECURSE_FOR_TESTS( diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index edfb1904ce..8efd3790b3 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -29,6 +29,7 @@ #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> #include <ydb/services/ydb/ydb_logstore.h> +#include <ydb/services/tablet/ydb_tablet.h> #include <ydb/services/discovery/grpc_service.h> #include <ydb/services/rate_limiter/grpc_service.h> #include <ydb/services/persqueue_cluster_discovery/grpc_service.h> @@ -429,6 +430,7 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcYmqService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxies[0], true)); GRpcServer->AddService(new NGRpcService::TGRpcYdbQueryService(system, counters, grpcRequestProxies, true, 1)); + GRpcServer->AddService(new NGRpcService::TGRpcYdbTabletService(system, counters, grpcRequestProxies, true, 1)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxies[0])); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxies[0])); diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make index 5a63f36a63..4d7fab765f 100644 --- a/ydb/core/testlib/ya.make +++ b/ydb/core/testlib/ya.make @@ -111,6 +111,7 @@ PEERDIR( ydb/services/replication ydb/services/monitoring ydb/services/metadata/ds_table + ydb/services/tablet ydb/services/ydb ydb/core/http_proxy diff --git a/ydb/services/tablet/ya.make b/ydb/services/tablet/ya.make new file mode 100644 index 0000000000..a525f7899d --- /dev/null +++ b/ydb/services/tablet/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + ydb_tablet.cpp +) + +PEERDIR( + ydb/library/grpc/server + ydb/public/api/grpc/draft + ydb/core/grpc_services + ydb/core/grpc_services/base + ydb/core/grpc_services/tablet +) + +END() diff --git a/ydb/services/tablet/ydb_tablet.cpp b/ydb/services/tablet/ydb_tablet.cpp new file mode 100644 index 0000000000..f6e78a3c4e --- /dev/null +++ b/ydb/services/tablet/ydb_tablet.cpp @@ -0,0 +1,55 @@ +#include "ydb_tablet.h" + +#include <ydb/core/grpc_services/tablet/service_tablet.h> +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/base/base.h> + +namespace NKikimr::NGRpcService { + +TGRpcYdbTabletService::TGRpcYdbTabletService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue) + : TBase(system, counters, proxies, rlAllowed) + , HandlersPerCompletionQueue(handlersPerCompletionQueue) +{} + +void TGRpcYdbTabletService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + + size_t proxyCounter = 0; + +#ifdef ADD_REQUEST_LIMIT +#error ADD_REQUEST_LIMIT macro already defined +#endif + +#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE, ...) do { \ + for (size_t i = 0; i < HandlersPerCompletionQueue; ++i) { \ + for (auto* cq: CQS) { \ + auto proxy = GRpcProxies_[proxyCounter++ % GRpcProxies_.size()]; \ + MakeIntrusive<TGRpcRequest<Ydb::Tablet::NAME##Request, Ydb::Tablet::NAME##Response, TGRpcYdbTabletService>> \ + (this, &Service_, cq, \ + [this, proxy](NYdbGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(proxy, \ + new TGrpcRequestNoOperationCall<Ydb::Tablet::NAME##Request, Ydb::Tablet::NAME##Response> \ + (ctx, &CB, TRequestAuxSettings { \ + .RlMode = RLSWITCH(TRateLimiterMode::LIMIT_TYPE), \ + __VA_OPT__(.AuditMode = TAuditMode::__VA_ARGS__,) \ + })); \ + }, &Ydb::Tablet::V1::TabletService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("tablet", #NAME))->Run(); \ + } \ + } \ +} while(0) + + ADD_REQUEST_LIMIT(ExecuteTabletMiniKQL, DoExecuteTabletMiniKQLRequest, Rps, Auditable); + ADD_REQUEST_LIMIT(ChangeTabletSchema, DoChangeTabletSchemaRequest, Rps, Auditable); + ADD_REQUEST_LIMIT(RestartTablet, DoRestartTabletRequest, Rps); + +#undef ADD_REQUEST_LIMIT +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/tablet/ydb_tablet.h b/ydb/services/tablet/ydb_tablet.h new file mode 100644 index 0000000000..638314c495 --- /dev/null +++ b/ydb/services/tablet/ydb_tablet.h @@ -0,0 +1,31 @@ +#pragma once + +#include <ydb/library/actors/core/actorsystem.h> +#include <ydb/library/grpc/server/grpc_server.h> +#include <ydb/core/grpc_services/base/base_service.h> + +#include <ydb/public/api/grpc/draft/ydb_tablet_v1.grpc.pb.h> + +namespace NKikimr::NGRpcService { + +class TGRpcYdbTabletService + : public TGrpcServiceBase<Ydb::Tablet::V1::TabletService> +{ + using TBase = TGrpcServiceBase<Ydb::Tablet::V1::TabletService>; + +public: + TGRpcYdbTabletService( + NActors::TActorSystem *system, + TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, + const TVector<NActors::TActorId>& proxies, + bool rlAllowed, + size_t handlersPerCompletionQueue = 1); + +private: + void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); + +private: + const size_t HandlersPerCompletionQueue; +}; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/ya.make b/ydb/services/ya.make index 636feb6111..fc40a9c574 100644 --- a/ydb/services/ya.make +++ b/ydb/services/ya.make @@ -20,6 +20,7 @@ RECURSE( persqueue_v1 rate_limiter replication + tablet ydb ymq ) |