aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-15 19:45:03 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-02-15 19:45:03 +0300
commit3ca0e4d20361f13ade902c78a87faa4da73f44c0 (patch)
tree189a5c1686ab70d58511d050f177e7c652627778
parent477261f1113065d578e21c21933da7ae432d0afd (diff)
downloadydb-3ca0e4d20361f13ade902c78a87faa4da73f44c0.tar.gz
Invert grpc_request_proxy dependence for table service. KIKIMR-13646
ref:0b126dafc321557e232e9131321cc8ffa4790f9a
-rw-r--r--ydb/core/blobstorage/testload/test_load_kqp.cpp6
-rw-r--r--ydb/core/grpc_services/base/base.h17
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp21
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h21
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h15
-rw-r--r--ydb/core/grpc_services/rpc_alter_table.cpp12
-rw-r--r--ydb/core/grpc_services/rpc_begin_transaction.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_calls.h21
-rw-r--r--ydb/core/grpc_services/rpc_commit_transaction.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_common.h1
-rw-r--r--ydb/core/grpc_services/rpc_copy_table.cpp12
-rw-r--r--ydb/core/grpc_services/rpc_copy_tables.cpp12
-rw-r--r--ydb/core/grpc_services/rpc_create_session.cpp17
-rw-r--r--ydb/core/grpc_services/rpc_create_table.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_delete_session.cpp11
-rw-r--r--ydb/core/grpc_services/rpc_describe_table.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_describe_table_options.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_drop_table.cpp11
-rw-r--r--ydb/core/grpc_services/rpc_execute_data_query.cpp17
-rw-r--r--ydb/core/grpc_services/rpc_execute_scheme_query.cpp11
-rw-r--r--ydb/core/grpc_services/rpc_explain_data_query.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_keep_alive.cpp14
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp32
-rw-r--r--ydb/core/grpc_services/rpc_prepare_data_query.cpp14
-rw-r--r--ydb/core/grpc_services/rpc_read_table.cpp17
-rw-r--r--ydb/core/grpc_services/rpc_rename_tables.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_rollback_transaction.cpp13
-rw-r--r--ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp17
-rw-r--r--ydb/core/grpc_services/service_table.h35
-rw-r--r--ydb/core/kqp/kqp_ic_gateway.cpp6
-rw-r--r--ydb/services/ydb/ydb_table.cpp144
31 files changed, 334 insertions, 254 deletions
diff --git a/ydb/core/blobstorage/testload/test_load_kqp.cpp b/ydb/core/blobstorage/testload/test_load_kqp.cpp
index fec4fd7b2c0..89e04ed68de 100644
--- a/ydb/core/blobstorage/testload/test_load_kqp.cpp
+++ b/ydb/core/blobstorage/testload/test_load_kqp.cpp
@@ -223,7 +223,8 @@ private:
}
};
using namespace NGRpcService;
- using TEvCreateSessionRequest = TGRpcRequestWrapper<TRpcServices::EvCreateSession, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, true, TRateLimiterMode::Rps>;
+ using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
+ Ydb::Table::CreateSessionResponse>;
NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvCreateSessionRequest>(
std::move(request), std::move(cb), WorkingDir, TString(), ctx
);
@@ -247,7 +248,8 @@ private:
}
};
using namespace NGRpcService;
- using TEvExecuteSchemeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteSchemeQuery, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse, true, TRateLimiterMode::Rps>;
+ using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
+ Ydb::Table::ExecuteSchemeQueryResponse>;
NKikimr::NRpcService::DoLocalRpcSameMailbox<TEvExecuteSchemeQueryRequest>(
std::move(request), std::move(cb), WorkingDir, TString(), ctx
);
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 44b25c4a5f9..38380586218 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -360,6 +360,11 @@ public:
virtual void AddServerHint(const TString& hint) = 0;
virtual void SetCostInfo(float consumed_units) = 0;
+ virtual void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) = 0;
+ virtual void FinishStream() = 0;
+
+ virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) = 0;
+
private:
virtual void Reply(NProtoBuf::Message* resp, ui32 status = 0) = 0;
};
@@ -378,6 +383,7 @@ public:
};
class IRequestNoOpCtx : public IRequestCtx {
+
};
struct TCommonResponseFillerImpl {
@@ -914,7 +920,7 @@ public:
return GetPeerMetaValues(NYdb::YDB_REQUEST_TYPE_HEADER);
}
- void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) {
+ void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
// res->data() pointer is used inside grpc code.
// So this object should be destroyed during grpc_slice destroying routine
auto res = new TString;
@@ -954,9 +960,8 @@ public:
return google::protobuf::Arena::CreateMessage<TResult>(ctx->GetArena());
}
- template<typename Tcb>
- void SetStreamingNotify(Tcb&& cb) {
- Ctx_->SetNextReplyCallback(cb);
+ void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) override {
+ Ctx_->SetNextReplyCallback(std::move(cb));
}
void SetClientLostAction(std::function<void()>&& cb) override {
@@ -969,7 +974,7 @@ public:
Ctx_->GetFinishFuture().Subscribe(std::move(shutdown));
}
- void FinishStream() {
+ void FinishStream() override {
Ctx_->FinishStreamingOk();
}
@@ -1033,6 +1038,8 @@ class TGrpcRequestCall
typedef typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestIface;
public:
static constexpr bool IsOp = IsOperation;
+ static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
+
TGrpcRequestCall(NGrpc::IRequestContextBase* ctx,
void (*cb)(std::unique_ptr<TRequestIface>, const IFacilityProvider&), TRequestAuxSettings auxSettings = {})
: TGRpcRequestWrapperImpl<
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index a1e7accb877..358570e4e76 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -566,36 +566,17 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
switch (ev->GetTypeRewrite()) {
HFunc(TRefreshTokenImpl, PreHandle);
HFunc(TEvLoginRequest, PreHandle);
- HFunc(TEvAlterTableRequest, PreHandle);
- HFunc(TEvCreateTableRequest, PreHandle);
- HFunc(TEvDropTableRequest, PreHandle);
HFunc(TEvGetOperationRequest, PreHandle);
HFunc(TEvCancelOperationRequest, PreHandle);
HFunc(TEvForgetOperationRequest, PreHandle);
HFunc(TEvListOperationsRequest, PreHandle);
- HFunc(TEvCreateSessionRequest, PreHandle);
- HFunc(TEvKeepAliveRequest, PreHandle);
- HFunc(TEvDeleteSessionRequest, PreHandle);
- HFunc(TEvCopyTableRequest, PreHandle);
- HFunc(TEvCopyTablesRequest, PreHandle);
- HFunc(TEvRenameTablesRequest, PreHandle);
- HFunc(TEvDescribeTableRequest, PreHandle);
- HFunc(TEvReadTableRequest, PreHandle);
- HFunc(TEvExplainDataQueryRequest, PreHandle);
- HFunc(TEvPrepareDataQueryRequest, PreHandle);
- HFunc(TEvExecuteDataQueryRequest, PreHandle);
- HFunc(TEvExecuteSchemeQueryRequest, PreHandle);
HFunc(TEvCreateTenantRequest, PreHandle);
HFunc(TEvAlterTenantRequest, PreHandle);
HFunc(TEvGetTenantStatusRequest, PreHandle);
HFunc(TEvListTenantsRequest, PreHandle);
HFunc(TEvRemoveTenantRequest, PreHandle);
- HFunc(TEvBeginTransactionRequest, PreHandle);
- HFunc(TEvCommitTransactionRequest, PreHandle);
- HFunc(TEvRollbackTransactionRequest, PreHandle);
HFunc(TEvListEndpointsRequest, PreHandle);
HFunc(TEvDescribeTenantOptionsRequest, PreHandle);
- HFunc(TEvDescribeTableOptionsRequest, PreHandle);
HFunc(TEvCreateCoordinationNode, PreHandle);
HFunc(TEvAlterCoordinationNode, PreHandle);
HFunc(TEvDropCoordinationNode, PreHandle);
@@ -620,7 +601,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvImportFromS3Request, PreHandle);
HFunc(TEvImportDataRequest, PreHandle);
HFunc(TEvDiscoverPQClustersRequest, PreHandle);
- HFunc(TEvBulkUpsertRequest, PreHandle);
HFunc(TEvWhoAmIRequest, PreHandle);
HFunc(TEvCreateRateLimiterResource, PreHandle);
HFunc(TEvAlterRateLimiterResource, PreHandle);
@@ -632,7 +612,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvKikhouseRefreshSnapshotRequest, PreHandle);
HFunc(TEvKikhouseDiscardSnapshotRequest, PreHandle);
HFunc(TEvSelfCheckRequest, PreHandle);
- HFunc(TEvStreamExecuteScanQueryRequest, PreHandle);
HFunc(TEvCoordinationSessionRequest, PreHandle);
HFunc(TEvLongTxBeginRequest, PreHandle);
HFunc(TEvLongTxCommitRequest, PreHandle);
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index ac65990f67f..508f678330f 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -48,36 +48,17 @@ public:
};
protected:
- void Handle(TEvAlterTableRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCreateTableRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDropTableRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCopyTableRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCopyTablesRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvRenameTablesRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDescribeTableRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvGetOperationRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCancelOperationRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvForgetOperationRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvListOperationsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCreateSessionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvKeepAliveRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDeleteSessionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvReadTableRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvExplainDataQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvPrepareDataQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvExecuteDataQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvExecuteSchemeQueryRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCreateTenantRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvAlterTenantRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvGetTenantStatusRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvListTenantsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvRemoveTenantRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvBeginTransactionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvCommitTransactionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvListEndpointsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDescribeTenantOptionsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDescribeTableOptionsRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCreateCoordinationNode::TPtr& ev, const TActorContext& ctx);
void Handle(TEvAlterCoordinationNode::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDropCoordinationNode::TPtr& ev, const TActorContext& ctx);
@@ -102,7 +83,6 @@ protected:
void Handle(TEvImportFromS3Request::TPtr& ev, const TActorContext& ctx);
void Handle(TEvImportDataRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvDiscoverPQClustersRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvBulkUpsertRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvWhoAmIRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCreateRateLimiterResource::TPtr& ev, const TActorContext& ctx);
void Handle(TEvAlterRateLimiterResource::TPtr& ev, const TActorContext& ctx);
@@ -115,7 +95,6 @@ protected:
void Handle(TEvKikhouseDiscardSnapshotRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvSelfCheckRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLoginRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvStreamExecuteScanQueryRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvCoordinationSessionRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLongTxBeginRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLongTxCommitRequest::TPtr& ev, const TActorContext& ctx);
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index 23c3b6c8035..4f95c495409 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -145,6 +145,21 @@ public:
Y_FAIL("Unimplemented for local rpc");
}
+ void SetStreamingNotify(NGrpc::IRequestContextBase::TOnNextReply&& cb) override {
+ Y_UNUSED(cb);
+ Y_FAIL("Unimplemented for local rpc");
+ }
+
+ void FinishStream() override {
+ Y_FAIL("Unimplemented for local rpc");
+ }
+
+ virtual void SendSerializedResult(TString&& in, Ydb::StatusIds::StatusCode status) override {
+ Y_UNUSED(in);
+ Y_UNUSED(status);
+ Y_FAIL("Unimplemented for local rpc");
+ }
+
TMaybe<TString> GetTraceId() const override {
return Nothing();
}
diff --git a/ydb/core/grpc_services/rpc_alter_table.cpp b/ydb/core/grpc_services/rpc_alter_table.cpp
index d1f2755b66d..6d403250733 100644
--- a/ydb/core/grpc_services/rpc_alter_table.cpp
+++ b/ydb/core/grpc_services/rpc_alter_table.cpp
@@ -1,9 +1,11 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_scheme_base.h"
#include "rpc_common.h"
#include "operation_helpers.h"
#include "table_settings.h"
+#include "service_table.h"
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/cms/console/configs_dispatcher.h>
@@ -77,6 +79,9 @@ static std::pair<StatusIds::StatusCode, TString> CheckAddIndexDesc(const Ydb::Ta
return {StatusIds::SUCCESS, ""};
}
+using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
+ Ydb::Table::AlterTableResponse>;
+
class TAlterTableRPC : public TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest> {
using TBase = TRpcSchemeRequestActor<TAlterTableRPC, TEvAlterTableRequest>;
@@ -537,8 +542,8 @@ private:
TTableProfiles Profiles;
};
-void TGRpcRequestProxy::Handle(TEvAlterTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TAlterTableRPC(ev->Release().Release()));
+void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TAlterTableRPC(p.release()));
}
template<>
@@ -546,6 +551,5 @@ IActor* TEvAlterTableRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCt
return new TAlterTableRPC(msg);
}
-
} // namespace NKikimr
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_begin_transaction.cpp b/ydb/core/grpc_services/rpc_begin_transaction.cpp
index 9e77fafdd3e..d042e9c3eee 100644
--- a/ydb/core/grpc_services/rpc_begin_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_begin_transaction.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -14,13 +16,16 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvBeginTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::BeginTransactionRequest,
+ Ydb::Table::BeginTransactionResponse>;
+
class TBeginTransactionRPC : public TRpcKqpRequestActor<TBeginTransactionRPC, TEvBeginTransactionRequest> {
using TBase = TRpcKqpRequestActor<TBeginTransactionRPC, TEvBeginTransactionRequest>;
public:
using TResult = Ydb::Table::BeginTransactionResult;
- TBeginTransactionRPC(TEvBeginTransactionRequest* msg)
+ TBeginTransactionRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -104,8 +109,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvBeginTransactionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TBeginTransactionRPC(ev->Release().Release()));
+void DoBeginTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TBeginTransactionRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index 43136c2f4ad..4fa6b5b38ac 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -45,28 +45,10 @@ void FillYdbStatus(Draft::Dummy::PingResponse& resp, const NYql::TIssues& issues
template <>
void FillYdbStatus(Ydb::Coordination::SessionResponse& resp, const NYql::TIssues& issues, Ydb::StatusIds::StatusCode status);
-using TEvAlterTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTable, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse, true, TRateLimiterMode::Rps>;
-using TEvCreateTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCreateTable, Ydb::Table::CreateTableRequest, Ydb::Table::CreateTableResponse, true, TRateLimiterMode::Rps>;
-using TEvDropTableRequest = TGRpcRequestWrapper<TRpcServices::EvDropTable, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse, true, TRateLimiterMode::Rps>;
-using TEvCopyTableRequest = TGRpcRequestWrapper<TRpcServices::EvCopyTable, Ydb::Table::CopyTableRequest, Ydb::Table::CopyTableResponse, true, TRateLimiterMode::Rps>;
-using TEvCopyTablesRequest = TGRpcRequestWrapper<TRpcServices::EvCopyTables, Ydb::Table::CopyTablesRequest, Ydb::Table::CopyTablesResponse, true, TRateLimiterMode::Rps>;
-using TEvRenameTablesRequest = TGRpcRequestWrapper<TRpcServices::EvRenameTables, Ydb::Table::RenameTablesRequest, Ydb::Table::RenameTablesResponse, true, TRateLimiterMode::Rps>;
-using TEvDescribeTableRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTable, Ydb::Table::DescribeTableRequest, Ydb::Table::DescribeTableResponse, true, TRateLimiterMode::Rps>;
using TEvGetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvGetOperation, Ydb::Operations::GetOperationRequest, Ydb::Operations::GetOperationResponse, true, TRateLimiterMode::Rps>;
using TEvCancelOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvCancelOperation, Ydb::Operations::CancelOperationRequest, Ydb::Operations::CancelOperationResponse, false, TRateLimiterMode::Rps>;
using TEvForgetOperationRequest = TGRpcRequestValidationWrapper<TRpcServices::EvForgetOperation, Ydb::Operations::ForgetOperationRequest, Ydb::Operations::ForgetOperationResponse, false, TRateLimiterMode::Rps>;
using TEvListOperationsRequest = TGRpcRequestValidationWrapper<TRpcServices::EvListOperations, Ydb::Operations::ListOperationsRequest, Ydb::Operations::ListOperationsResponse, false, TRateLimiterMode::Rps>;
-using TEvCreateSessionRequest = TGRpcRequestWrapper<TRpcServices::EvCreateSession, Ydb::Table::CreateSessionRequest, Ydb::Table::CreateSessionResponse, true, TRateLimiterMode::Rps>;
-using TEvDeleteSessionRequest = TGRpcRequestWrapper<TRpcServices::EvDeleteSession, Ydb::Table::DeleteSessionRequest, Ydb::Table::DeleteSessionResponse, true>;
-using TEvKeepAliveRequest = TGRpcRequestWrapper<TRpcServices::EvKeepAlive, Ydb::Table::KeepAliveRequest, Ydb::Table::KeepAliveResponse, true, TRateLimiterMode::Rps>;
-using TEvReadTableRequest = TGRpcRequestWrapper<TRpcServices::EvReadTable, Ydb::Table::ReadTableRequest, Ydb::Table::ReadTableResponse, false, TRateLimiterMode::RuOnProgress>;
-using TEvExplainDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExplainDataQuery, Ydb::Table::ExplainDataQueryRequest, Ydb::Table::ExplainDataQueryResponse, true, TRateLimiterMode::Rps>;
-using TEvPrepareDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvPrepareDataQuery, Ydb::Table::PrepareDataQueryRequest, Ydb::Table::PrepareDataQueryResponse, true, TRateLimiterMode::Ru>;
-using TEvExecuteDataQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteDataQuery, Ydb::Table::ExecuteDataQueryRequest, Ydb::Table::ExecuteDataQueryResponse, true, TRateLimiterMode::Ru>;
-using TEvExecuteSchemeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvExecuteSchemeQuery, Ydb::Table::ExecuteSchemeQueryRequest, Ydb::Table::ExecuteSchemeQueryResponse, true, TRateLimiterMode::Rps>;
-using TEvBeginTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvBeginTransaction, Ydb::Table::BeginTransactionRequest, Ydb::Table::BeginTransactionResponse, true, TRateLimiterMode::Rps>;
-using TEvCommitTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvCommitTransaction, Ydb::Table::CommitTransactionRequest, Ydb::Table::CommitTransactionResponse, true>;
-using TEvRollbackTransactionRequest = TGRpcRequestWrapper<TRpcServices::EvRollbackTransaction, Ydb::Table::RollbackTransactionRequest, Ydb::Table::RollbackTransactionResponse, true>;
using TEvCreateTenantRequest = TGRpcRequestWrapper<TRpcServices::EvCreateTenant, Ydb::Cms::CreateDatabaseRequest, Ydb::Cms::CreateDatabaseResponse, true>;
using TEvAlterTenantRequest = TGRpcRequestWrapper<TRpcServices::EvAlterTenant, Ydb::Cms::AlterDatabaseRequest, Ydb::Cms::AlterDatabaseResponse, true>;
using TEvGetTenantStatusRequest = TGRpcRequestWrapper<TRpcServices::EvGetTenantStatus, Ydb::Cms::GetDatabaseStatusRequest, Ydb::Cms::GetDatabaseStatusResponse, true>;
@@ -74,7 +56,6 @@ using TEvListTenantsRequest = TGRpcRequestWrapper<TRpcServices::EvListTenants, Y
using TEvRemoveTenantRequest = TGRpcRequestWrapper<TRpcServices::EvRemoveTenant, Ydb::Cms::RemoveDatabaseRequest, Ydb::Cms::RemoveDatabaseResponse, true>;
using TEvListEndpointsRequest = TGRpcRequestWrapper<TRpcServices::EvListEndpoints, Ydb::Discovery::ListEndpointsRequest, Ydb::Discovery::ListEndpointsResponse, true>;
using TEvDescribeTenantOptionsRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTenantOptions, Ydb::Cms::DescribeDatabaseOptionsRequest, Ydb::Cms::DescribeDatabaseOptionsResponse, true>;
-using TEvDescribeTableOptionsRequest = TGRpcRequestWrapper<TRpcServices::EvDescribeTableOptions, Ydb::Table::DescribeTableOptionsRequest, Ydb::Table::DescribeTableOptionsResponse, true, TRateLimiterMode::Rps>;
using TEvCreateCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvCreateCoordinationNode, Ydb::Coordination::CreateNodeRequest, Ydb::Coordination::CreateNodeResponse, true, TRateLimiterMode::Rps>;
using TEvAlterCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvAlterCoordinationNode, Ydb::Coordination::AlterNodeRequest, Ydb::Coordination::AlterNodeResponse, true, TRateLimiterMode::Rps>;
using TEvDropCoordinationNode = TGRpcRequestWrapper<TRpcServices::EvDropCoordinationNode, Ydb::Coordination::DropNodeRequest, Ydb::Coordination::DropNodeResponse, true, TRateLimiterMode::Rps>;
@@ -99,7 +80,6 @@ using TEvExportToS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvExpor
using TEvImportFromS3Request = TGRpcRequestValidationWrapper<TRpcServices::EvImportFromS3, Ydb::Import::ImportFromS3Request, Ydb::Import::ImportFromS3Response, true>;
using TEvImportDataRequest = TGRpcRequestValidationWrapper<TRpcServices::EvImportData, Ydb::Import::ImportDataRequest, Ydb::Import::ImportDataResponse, true>;
using TEvDiscoverPQClustersRequest = TGRpcRequestWrapper<TRpcServices::EvDiscoverPQClusters, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersRequest, Ydb::PersQueue::ClusterDiscovery::DiscoverClustersResponse, true>;
-using TEvBulkUpsertRequest = TGRpcRequestWrapper<TRpcServices::EvBulkUpsert, Ydb::Table::BulkUpsertRequest, Ydb::Table::BulkUpsertResponse, true, TRateLimiterMode::Ru>;
using TEvWhoAmIRequest = TGRpcRequestWrapper<TRpcServices::EvWhoAmI, Ydb::Discovery::WhoAmIRequest, Ydb::Discovery::WhoAmIResponse, true, TRateLimiterMode::Rps>;
using TEvCreateRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvCreateRateLimiterResource, Ydb::RateLimiter::CreateResourceRequest, Ydb::RateLimiter::CreateResourceResponse, true, TRateLimiterMode::Rps>;
using TEvAlterRateLimiterResource = TGRpcRequestWrapper<TRpcServices::EvAlterRateLimiterResource, Ydb::RateLimiter::AlterResourceRequest, Ydb::RateLimiter::AlterResourceResponse, true, TRateLimiterMode::Rps>;
@@ -112,7 +92,6 @@ using TEvKikhouseRefreshSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKi
using TEvKikhouseDiscardSnapshotRequest = TGRpcRequestWrapper<TRpcServices::EvKikhouseDiscardSnapshot, Ydb::ClickhouseInternal::DiscardSnapshotRequest, Ydb::ClickhouseInternal::DiscardSnapshotResponse, true>;
using TEvSelfCheckRequest = TGRpcRequestWrapper<TRpcServices::EvSelfCheck, Ydb::Monitoring::SelfCheckRequest, Ydb::Monitoring::SelfCheckResponse, true>;
using TEvLoginRequest = TGRpcRequestWrapperNoAuth<TRpcServices::EvLogin, Ydb::Auth::LoginRequest, Ydb::Auth::LoginResponse>;
-using TEvStreamExecuteScanQueryRequest = TGRpcRequestWrapper<TRpcServices::EvStreamExecuteScanQuery, Ydb::Table::ExecuteScanQueryRequest, Ydb::Table::ExecuteScanQueryPartialResponse, false, TRateLimiterMode::RuOnProgress>;
using TEvCoordinationSessionRequest = TGRpcRequestBiStreamWrapper<TRpcServices::EvCoordinationSession, Ydb::Coordination::SessionRequest, Ydb::Coordination::SessionResponse>;
using TEvLongTxBeginRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxBegin, Ydb::LongTx::BeginTransactionRequest, Ydb::LongTx::BeginTransactionResponse, true>;
using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit, Ydb::LongTx::CommitTransactionRequest, Ydb::LongTx::CommitTransactionResponse, true>;
diff --git a/ydb/core/grpc_services/rpc_commit_transaction.cpp b/ydb/core/grpc_services/rpc_commit_transaction.cpp
index 3b85b5ef62d..561bf1e2b1e 100644
--- a/ydb/core/grpc_services/rpc_commit_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_commit_transaction.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -14,11 +16,14 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvCommitTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::CommitTransactionRequest,
+Ydb::Table::CommitTransactionResponse>;
+
class TCommitTransactionRPC : public TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest> {
using TBase = TRpcKqpRequestActor<TCommitTransactionRPC, TEvCommitTransactionRequest>;
public:
- TCommitTransactionRPC(TEvCommitTransactionRequest* msg)
+ TCommitTransactionRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -92,8 +97,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvCommitTransactionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCommitTransactionRPC(ev->Release().Release()));
+void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TCommitTransactionRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_common.h b/ydb/core/grpc_services/rpc_common.h
index 075318037c9..dbcd5f9e1b3 100644
--- a/ydb/core/grpc_services/rpc_common.h
+++ b/ydb/core/grpc_services/rpc_common.h
@@ -8,6 +8,7 @@
namespace NKikimr {
namespace NGRpcService {
+class IRequestCtx;
template<typename TEv>
inline void SetRlPath(TEv& ev, const IRequestCtx& ctx) {
diff --git a/ydb/core/grpc_services/rpc_copy_table.cpp b/ydb/core/grpc_services/rpc_copy_table.cpp
index 74a0c9d9a81..125e4e4377e 100644
--- a/ydb/core/grpc_services/rpc_copy_table.cpp
+++ b/ydb/core/grpc_services/rpc_copy_table.cpp
@@ -1,4 +1,5 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
@@ -10,11 +11,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvCopyTableRequest = TGrpcRequestOperationCall<Ydb::Table::CopyTableRequest,
+ Ydb::Table::CopyTableResponse>;
+
class TCopyTableRPC : public TRpcSchemeRequestActor<TCopyTableRPC, TEvCopyTableRequest> {
using TBase = TRpcSchemeRequestActor<TCopyTableRPC, TEvCopyTableRequest>;
public:
- TCopyTableRPC(TEvCopyTableRequest* msg)
+ TCopyTableRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -50,8 +54,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvCopyTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCopyTableRPC(ev->Release().Release()));
+void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TCopyTableRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_copy_tables.cpp b/ydb/core/grpc_services/rpc_copy_tables.cpp
index 1f5fd281a51..10f2fe0cbe6 100644
--- a/ydb/core/grpc_services/rpc_copy_tables.cpp
+++ b/ydb/core/grpc_services/rpc_copy_tables.cpp
@@ -1,4 +1,5 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
@@ -10,11 +11,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvCopyTablesRequest = TGrpcRequestOperationCall<Ydb::Table::CopyTablesRequest,
+ Ydb::Table::CopyTablesResponse>;
+
class TCopyTablesRPC : public TRpcSchemeRequestActor<TCopyTablesRPC, TEvCopyTablesRequest> {
using TBase = TRpcSchemeRequestActor<TCopyTablesRPC, TEvCopyTablesRequest>;
public:
- TCopyTablesRPC(TEvCopyTablesRequest* msg)
+ TCopyTablesRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -44,8 +48,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvCopyTablesRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCopyTablesRPC(ev->Release().Release()));
+void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TCopyTablesRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_create_session.cpp b/ydb/core/grpc_services/rpc_create_session.cpp
index 2f6e9fae266..c4e6c6f9ff5 100644
--- a/ydb/core/grpc_services/rpc_create_session.cpp
+++ b/ydb/core/grpc_services/rpc_create_session.cpp
@@ -1,8 +1,9 @@
-#include "grpc_request_proxy.h"
-
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_common.h"
#include "rpc_kqp_base.h"
+#include "service_table.h"
#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
@@ -16,6 +17,9 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvCreateSessionRequest = TGrpcRequestOperationCall<Ydb::Table::CreateSessionRequest,
+ Ydb::Table::CreateSessionResponse>;
+
class TCreateSessionRPC : public TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest> {
using TBase = TRpcKqpRequestActor<TCreateSessionRPC, TEvCreateSessionRequest>;
@@ -98,7 +102,10 @@ private:
auto database = Request_->GetDatabaseName().GetOrElse("");
- auto actorId = NRpcService::DoLocalRpcSameMailbox<NKikimr::NGRpcService::TEvDeleteSessionRequest>(
+ using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest,
+ Ydb::Table::DeleteSessionResponse>;
+
+ auto actorId = NRpcService::DoLocalRpcSameMailbox<TEvDeleteSessionRequest>(
std::move(request), std::move(cb), database, Request_->GetInternalToken(), ctx);
LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::GRPC_PROXY,
@@ -134,8 +141,8 @@ private:
};
-void TGRpcRequestProxy::Handle(TEvCreateSessionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateSessionRPC(ev->Release().Release()));
+void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TCreateSessionRPC(p.release()));
}
template<>
diff --git a/ydb/core/grpc_services/rpc_create_table.cpp b/ydb/core/grpc_services/rpc_create_table.cpp
index 302dfde57a8..209b7483682 100644
--- a/ydb/core/grpc_services/rpc_create_table.cpp
+++ b/ydb/core/grpc_services/rpc_create_table.cpp
@@ -1,5 +1,5 @@
-#include "grpc_request_proxy.h"
-
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -21,11 +21,14 @@ using namespace NConsole;
using namespace Ydb;
using namespace Ydb::Table;
+using TEvCreateTableRequest = TGrpcRequestOperationCall<Ydb::Table::CreateTableRequest,
+ Ydb::Table::CreateTableResponse>;
+
class TCreateTableRPC : public TRpcSchemeRequestActor<TCreateTableRPC, TEvCreateTableRequest> {
using TBase = TRpcSchemeRequestActor<TCreateTableRPC, TEvCreateTableRequest>;
public:
- TCreateTableRPC(TEvCreateTableRequest* msg)
+ TCreateTableRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -226,8 +229,8 @@ private:
TTableProfiles Profiles;
};
-void TGRpcRequestProxy::Handle(TEvCreateTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateTableRPC(ev->Release().Release()));
+void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TCreateTableRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_delete_session.cpp b/ydb/core/grpc_services/rpc_delete_session.cpp
index a2245af2362..10d1683c7ea 100644
--- a/ydb/core/grpc_services/rpc_delete_session.cpp
+++ b/ydb/core/grpc_services/rpc_delete_session.cpp
@@ -1,7 +1,9 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
+#include "service_table.h"
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -13,6 +15,9 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvDeleteSessionRequest = TGrpcRequestOperationCall<Ydb::Table::DeleteSessionRequest,
+ Ydb::Table::DeleteSessionResponse>;
+
class TDeleteSessionRPC : public TRpcKqpRequestActor<TDeleteSessionRPC, TEvDeleteSessionRequest> {
using TBase = TRpcKqpRequestActor<TDeleteSessionRPC, TEvDeleteSessionRequest>;
@@ -46,8 +51,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvDeleteSessionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDeleteSessionRPC(ev->Release().Release()));
+void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TDeleteSessionRPC(p.release()));
}
template<>
diff --git a/ydb/core/grpc_services/rpc_describe_table.cpp b/ydb/core/grpc_services/rpc_describe_table.cpp
index 261f10d4676..df8dbdfb9b7 100644
--- a/ydb/core/grpc_services/rpc_describe_table.cpp
+++ b/ydb/core/grpc_services/rpc_describe_table.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
+#include "service_table.h"
#include "rpc_common.h"
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ydb_convert/table_description.h>
@@ -14,11 +16,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvDescribeTableRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTableRequest,
+ Ydb::Table::DescribeTableResponse>;
+
class TDescribeTableRPC : public TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest> {
using TBase = TRpcSchemeRequestActor<TDescribeTableRPC, TEvDescribeTableRequest>;
public:
- TDescribeTableRPC(TEvDescribeTableRequest* msg)
+ TDescribeTableRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -133,8 +138,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvDescribeTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeTableRPC(ev->Release().Release()));
+void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TDescribeTableRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_describe_table_options.cpp b/ydb/core/grpc_services/rpc_describe_table_options.cpp
index 6d77e3cff71..057ffc8ccd7 100644
--- a/ydb/core/grpc_services/rpc_describe_table_options.cpp
+++ b/ydb/core/grpc_services/rpc_describe_table_options.cpp
@@ -1,9 +1,11 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
#include "table_profiles.h"
+#include "service_table.h"
#include <ydb/core/cms/console/configs_dispatcher.h>
#include <ydb/core/protos/console_config.pb.h>
@@ -17,11 +19,14 @@ using namespace NConsole;
using namespace Ydb;
using namespace Ydb::Table;
+using TEvDescribeTableOptionsRequest = TGrpcRequestOperationCall<Ydb::Table::DescribeTableOptionsRequest,
+ Ydb::Table::DescribeTableOptionsResponse>;
+
class TDescribeTableOptionsRPC : public TRpcSchemeRequestActor<TDescribeTableOptionsRPC, TEvDescribeTableOptionsRequest> {
using TBase = TRpcSchemeRequestActor<TDescribeTableOptionsRPC, TEvDescribeTableOptionsRequest>;
public:
- TDescribeTableOptionsRPC(TEvDescribeTableOptionsRequest* msg)
+ TDescribeTableOptionsRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -202,8 +207,8 @@ private:
TTableProfiles Profiles;
};
-void TGRpcRequestProxy::Handle(TEvDescribeTableOptionsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeTableOptionsRPC(ev->Release().Release()));
+void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TDescribeTableOptionsRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_drop_table.cpp b/ydb/core/grpc_services/rpc_drop_table.cpp
index 27899a9d773..d1c70f2dce3 100644
--- a/ydb/core/grpc_services/rpc_drop_table.cpp
+++ b/ydb/core/grpc_services/rpc_drop_table.cpp
@@ -1,5 +1,7 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include "service_table.h"
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -10,6 +12,9 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvDropTableRequest = TGrpcRequestOperationCall<Ydb::Table::DropTableRequest,
+ Ydb::Table::DropTableResponse>;
+
class TDropTableRPC : public TRpcSchemeRequestActor<TDropTableRPC, TEvDropTableRequest> {
using TBase = TRpcSchemeRequestActor<TDropTableRPC, TEvDropTableRequest>;
@@ -54,8 +59,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvDropTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDropTableRPC(ev->Release().Release()));
+void DoDropTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TDropTableRPC(p.release()));
}
template<>
diff --git a/ydb/core/grpc_services/rpc_execute_data_query.cpp b/ydb/core/grpc_services/rpc_execute_data_query.cpp
index 317a3488ce0..0f4a7072578 100644
--- a/ydb/core/grpc_services/rpc_execute_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_data_query.cpp
@@ -1,9 +1,11 @@
-#include "grpc_request_proxy.h"
-
-#include "rpc_calls.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/public/api/protos/ydb_scheme.pb.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
@@ -20,13 +22,16 @@ using namespace Ydb;
using namespace Ydb::Table;
using namespace NKqp;
+using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
+ Ydb::Table::ExecuteDataQueryResponse>;
+
class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest> {
using TBase = TRpcKqpRequestActor<TExecuteDataQueryRPC, TEvExecuteDataQueryRequest>;
public:
using TResult = Ydb::Table::ExecuteQueryResult;
- TExecuteDataQueryRPC(TEvExecuteDataQueryRequest* msg)
+ TExecuteDataQueryRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -243,8 +248,8 @@ public:
}
};
-void TGRpcRequestProxy::Handle(TEvExecuteDataQueryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TExecuteDataQueryRPC(ev->Release().Release()));
+void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TExecuteDataQueryRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
index 6239bd7798a..dd75c0851ac 100644
--- a/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
+++ b/ydb/core/grpc_services/rpc_execute_scheme_query.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/core/protos/console_config.pb.h>
@@ -16,6 +18,9 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvExecuteSchemeQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteSchemeQueryRequest,
+ Ydb::Table::ExecuteSchemeQueryResponse>;
+
class TExecuteSchemeQueryRPC : public TRpcKqpRequestActor<TExecuteSchemeQueryRPC, TEvExecuteSchemeQueryRequest> {
using TBase = TRpcKqpRequestActor<TExecuteSchemeQueryRPC, TEvExecuteSchemeQueryRequest>;
@@ -87,8 +92,8 @@ public:
}
};
-void TGRpcRequestProxy::Handle(TEvExecuteSchemeQueryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TExecuteSchemeQueryRPC(ev->Release().Release()));
+void DoExecuteSchemeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TExecuteSchemeQueryRPC(p.release()));
}
template<>
diff --git a/ydb/core/grpc_services/rpc_explain_data_query.cpp b/ydb/core/grpc_services/rpc_explain_data_query.cpp
index cc42fb92358..eafa612e2d6 100644
--- a/ydb/core/grpc_services/rpc_explain_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_explain_data_query.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/core/protos/console_config.pb.h>
@@ -18,11 +20,14 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvExplainDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExplainDataQueryRequest,
+ Ydb::Table::ExplainDataQueryResponse>;
+
class TExplainDataQueryRPC : public TRpcKqpRequestActor<TExplainDataQueryRPC, TEvExplainDataQueryRequest> {
using TBase = TRpcKqpRequestActor<TExplainDataQueryRPC, TEvExplainDataQueryRequest>;
public:
- TExplainDataQueryRPC(TEvExplainDataQueryRequest* msg)
+ TExplainDataQueryRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -87,8 +92,8 @@ public:
}
};
-void TGRpcRequestProxy::Handle(TEvExplainDataQueryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TExplainDataQueryRPC(ev->Release().Release()));
+void DoExplainDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TExplainDataQueryRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_keep_alive.cpp b/ydb/core/grpc_services/rpc_keep_alive.cpp
index e4188c8ab91..9a38d0a3a97 100644
--- a/ydb/core/grpc_services/rpc_keep_alive.cpp
+++ b/ydb/core/grpc_services/rpc_keep_alive.cpp
@@ -1,9 +1,12 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
+
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -14,11 +17,14 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvKeepAliveRequest = TGrpcRequestOperationCall<Ydb::Table::KeepAliveRequest,
+ Ydb::Table::KeepAliveResponse>;
+
class TKeepAliveRPC : public TRpcKqpRequestActor<TKeepAliveRPC, TEvKeepAliveRequest> {
using TBase = TRpcKqpRequestActor<TKeepAliveRPC, TEvKeepAliveRequest>;
public:
- TKeepAliveRPC(TEvKeepAliveRequest* msg)
+ TKeepAliveRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -87,8 +93,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvKeepAliveRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TKeepAliveRPC(ev->Release().Release()));
+void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TKeepAliveRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 127347e067e..a4936f1c350 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -1,7 +1,8 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
-#include "rpc_calls.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
#include <ydb/library/yql/public/udf/udf_types.h>
@@ -173,10 +174,13 @@ bool ConvertArrowToYdbPrimitive(const arrow::DataType& type, Ydb::Type& toType)
}
+using TEvBulkUpsertRequest = TGrpcRequestOperationCall<Ydb::Table::BulkUpsertRequest,
+ Ydb::Table::BulkUpsertResponse>;
+
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadRowsRPCPublic(TAutoPtr<TEvBulkUpsertRequest> request)
+ explicit TUploadRowsRPCPublic(TEvBulkUpsertRequest* request)
: TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout()))
, Request(request)
{}
@@ -432,14 +436,14 @@ private:
}
private:
- TAutoPtr<TEvBulkUpsertRequest> Request;
+ std::unique_ptr<TEvBulkUpsertRequest> Request;
TVector<std::pair<TSerializedCellVec, TString>> AllRows;
};
class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadColumnsRPCPublic(TAutoPtr<TEvBulkUpsertRequest> request)
+ explicit TUploadColumnsRPCPublic(TEvBulkUpsertRequest* request)
: TBase(GetDuration(request->GetProtoRequest()->operation_params().operation_timeout()))
, Request(request)
{}
@@ -646,7 +650,7 @@ private:
}
private:
- TAutoPtr<TEvBulkUpsertRequest> Request;
+ std::unique_ptr<TEvBulkUpsertRequest> Request;
TVector<std::pair<TSerializedCellVec, TString>> Rows;
const Ydb::Formats::CsvSettings& GetCsvSettings() const {
@@ -654,14 +658,16 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvBulkUpsertRequest::TPtr& ev, const TActorContext& ctx) {
- auto* req = ev->Get()->GetProtoRequest();
- if (req->has_arrow_batch_settings()) {
- ctx.Register(new TUploadColumnsRPCPublic(ev->Release().Release()));
- } else if (req->has_csv_settings()) {
- ctx.Register(new TUploadColumnsRPCPublic(ev->Release().Release()));
+void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+
+ auto* req = dynamic_cast<TEvBulkUpsertRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ if (req->GetProtoRequest()->has_arrow_batch_settings()) {
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req));
+ } else if (req->GetProtoRequest()->has_csv_settings()) {
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(req));
} else {
- ctx.Register(new TUploadRowsRPCPublic(ev->Release().Release()));
+ TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(req));
}
}
diff --git a/ydb/core/grpc_services/rpc_prepare_data_query.cpp b/ydb/core/grpc_services/rpc_prepare_data_query.cpp
index eb4c5718774..7c2adce96cc 100644
--- a/ydb/core/grpc_services/rpc_prepare_data_query.cpp
+++ b/ydb/core/grpc_services/rpc_prepare_data_query.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/core/protos/console_config.pb.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
@@ -19,11 +21,15 @@ using namespace NOperationId;
using namespace Ydb;
using namespace NKqp;
+using TEvPrepareDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::PrepareDataQueryRequest,
+ Ydb::Table::PrepareDataQueryResponse>;
+
+
class TPrepareDataQueryRPC : public TRpcKqpRequestActor<TPrepareDataQueryRPC, TEvPrepareDataQueryRequest> {
using TBase = TRpcKqpRequestActor<TPrepareDataQueryRPC, TEvPrepareDataQueryRequest>;
public:
- TPrepareDataQueryRPC(TEvPrepareDataQueryRequest* msg)
+ TPrepareDataQueryRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -104,8 +110,8 @@ public:
}
};
-void TGRpcRequestProxy::Handle(TEvPrepareDataQueryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TPrepareDataQueryRPC(ev->Release().Release()));
+void DoPrepareDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TPrepareDataQueryRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_read_table.cpp b/ydb/core/grpc_services/rpc_read_table.cpp
index bbb87907289..5b6b139d864 100644
--- a/ydb/core/grpc_services/rpc_read_table.cpp
+++ b/ydb/core/grpc_services/rpc_read_table.cpp
@@ -1,9 +1,11 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_common.h"
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "local_rate_limiter.h"
+#include "service_table.h"
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -27,6 +29,9 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvReadTableRequest = TGrpcRequestNoOperationCall<Ydb::Table::ReadTableRequest,
+ Ydb::Table::ReadTableResponse>;
+
static const TDuration RlMaxDuration = TDuration::Minutes(1);
static ui64 CalcRuConsumption(const TString& data) {
@@ -107,7 +112,7 @@ public:
return NKikimrServices::TActivity::GRPC_STREAM_REQ;
}
- TReadTableRPC(TEvReadTableRequest* msg)
+ TReadTableRPC(IRequestNoOpCtx* msg)
: Request_(msg)
, QuotaLimit_(10)
, QuotaReserved_(0)
@@ -431,7 +436,7 @@ private:
}
void SendProposeRequest(const TActorContext &ctx) {
- const auto req = Request_->GetProtoRequest();
+ const auto req = TEvReadTableRequest::GetProtoRequest(Request_.get());
auto actorId = SelfId();
const TActorSystem* const as = ctx.ExecutorThread.ActorSystem;
auto cb = [actorId, as](size_t left) {
@@ -721,7 +726,7 @@ private:
TryToAllocateQuota();
}
- std::unique_ptr<TEvReadTableRequest> Request_;
+ std::unique_ptr<IRequestNoOpCtx> Request_;
TActorId ReadTableActor;
@@ -754,8 +759,8 @@ private:
bool HasPendingSuccess = false;
};
-void TGRpcRequestProxy::Handle(TEvReadTableRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TReadTableRPC(ev->Release().Release()));
+void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TReadTableRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_rename_tables.cpp b/ydb/core/grpc_services/rpc_rename_tables.cpp
index 69684e889b1..015f55e9957 100644
--- a/ydb/core/grpc_services/rpc_rename_tables.cpp
+++ b/ydb/core/grpc_services/rpc_rename_tables.cpp
@@ -1,5 +1,7 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
+#include "service_table.h"
#include "rpc_calls.h"
#include "rpc_scheme_base.h"
#include "rpc_common.h"
@@ -10,11 +12,14 @@ namespace NGRpcService {
using namespace NActors;
using namespace Ydb;
+using TEvRenameTablesRequest = TGrpcRequestOperationCall<Ydb::Table::RenameTablesRequest,
+ Ydb::Table::RenameTablesResponse>;
+
class TRenameTablesRPC : public TRpcSchemeRequestActor<TRenameTablesRPC, TEvRenameTablesRequest> {
using TBase = TRpcSchemeRequestActor<TRenameTablesRPC, TEvRenameTablesRequest>;
public:
- TRenameTablesRPC(TEvRenameTablesRequest* msg)
+ TRenameTablesRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext &ctx) {
@@ -63,8 +68,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvRenameTablesRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TRenameTablesRPC(ev->Release().Release()));
+void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TRenameTablesRPC(p.release()));
}
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_rollback_transaction.cpp b/ydb/core/grpc_services/rpc_rollback_transaction.cpp
index b6cdfe28177..12c102d4839 100644
--- a/ydb/core/grpc_services/rpc_rollback_transaction.cpp
+++ b/ydb/core/grpc_services/rpc_rollback_transaction.cpp
@@ -1,8 +1,10 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_calls.h"
#include "rpc_kqp_base.h"
#include "rpc_common.h"
+#include "service_table.h"
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/yql/public/issue/yql_issue.h>
@@ -14,11 +16,14 @@ using namespace NActors;
using namespace Ydb;
using namespace NKqp;
+using TEvRollbackTransactionRequest = TGrpcRequestOperationCall<Ydb::Table::RollbackTransactionRequest,
+ Ydb::Table::RollbackTransactionResponse>;
+
class TRollbackTransactionRPC : public TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest> {
using TBase = TRpcKqpRequestActor<TRollbackTransactionRPC, TEvRollbackTransactionRequest>;
public:
- TRollbackTransactionRPC(TEvRollbackTransactionRequest* msg)
+ TRollbackTransactionRPC(IRequestOpCtx* msg)
: TBase(msg) {}
void Bootstrap(const TActorContext& ctx) {
@@ -90,8 +95,8 @@ private:
}
};
-void TGRpcRequestProxy::Handle(TEvRollbackTransactionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TRollbackTransactionRPC(ev->Release().Release()));
+void DoRollbackTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ TActivationContext::AsActorContext().Register(new TRollbackTransactionRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
index 69d14f9ad74..f50883e7993 100644
--- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
+++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
@@ -1,7 +1,9 @@
-#include "grpc_request_proxy.h"
+#include "service_table.h"
+#include <ydb/core/grpc_services/base/base.h>
#include "rpc_common.h"
#include "rpc_kqp_base.h"
+#include "service_table.h"
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/appdata.h>
@@ -240,6 +242,9 @@ bool FillProfile(Ydb::Table::ExecuteScanQueryPartialResponse& response,
return false;
}
+using TEvStreamExecuteScanQueryRequest = TGrpcRequestNoOperationCall<Ydb::Table::ExecuteScanQueryRequest,
+ Ydb::Table::ExecuteScanQueryPartialResponse>;
+
template<typename TRequestEv, typename TResponse>
class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQueryRPC<TRequestEv, TResponse>> {
private:
@@ -629,10 +634,12 @@ void TGRpcRequestProxy::Handle(TEvExperimentalStreamQueryRequest::TPtr& ev, cons
Ydb::Experimental::ExecuteStreamQueryResponse>(ev->Release().Release(), rpcBufferSize));
}
-void TGRpcRequestProxy::Handle(TEvStreamExecuteScanQueryRequest::TPtr& ev, const TActorContext& ctx) {
- ui64 rpcBufferSize = GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize();
- ctx.Register(new TStreamExecuteScanQueryRPC<TEvStreamExecuteScanQueryRequest,
- Ydb::Table::ExecuteScanQueryPartialResponse>(ev->Release().Release(), rpcBufferSize));
+void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
+ ui64 rpcBufferSize = f.GetAppConfig().GetTableServiceConfig().GetResourceManager().GetChannelBufferSize();
+ auto* req = dynamic_cast<TEvStreamExecuteScanQueryRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ TActivationContext::AsActorContext().Register(new TStreamExecuteScanQueryRPC<TEvStreamExecuteScanQueryRequest,
+ Ydb::Table::ExecuteScanQueryPartialResponse>(req, rpcBufferSize));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/service_table.h b/ydb/core/grpc_services/service_table.h
new file mode 100644
index 00000000000..adf8493daac
--- /dev/null
+++ b/ydb/core/grpc_services/service_table.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IRequestNoOpCtx;
+class IFacilityProvider;
+
+void DoAlterTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoCreateTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDropTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoReadTableRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
+void DoExplainDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoPrepareDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoExecuteDataQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoExecuteSchemeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoBeginTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoCommitTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoRollbackTransactionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDescribeTableOptionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoExecuteScanQueryRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/core/kqp/kqp_ic_gateway.cpp b/ydb/core/kqp/kqp_ic_gateway.cpp
index 196da496c99..305b165b7ed 100644
--- a/ydb/core/kqp/kqp_ic_gateway.cpp
+++ b/ydb/core/kqp/kqp_ic_gateway.cpp
@@ -1229,7 +1229,8 @@ public:
// FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency
using namespace NGRpcService;
- using TEvAlterTableRequest = TGRpcRequestValidationWrapper<TRpcServices::EvAlterTable, Ydb::Table::AlterTableRequest, Ydb::Table::AlterTableResponse, true, TRateLimiterMode::Rps>;
+ using TEvAlterTableRequest = TGrpcRequestOperationCall<Ydb::Table::AlterTableRequest,
+ Ydb::Table::AlterTableResponse>;
return SendLocalRpcRequestNoResult<TEvAlterTableRequest>(std::move(req), Database, GetTokenCompat());
}
@@ -1283,7 +1284,8 @@ public:
// FIXME: should be defined in grpc_services/rpc_calls.h, but cause cyclic dependency
using namespace NGRpcService;
- using TEvDropTableRequest = TGRpcRequestWrapper<TRpcServices::EvDropTable, Ydb::Table::DropTableRequest, Ydb::Table::DropTableResponse, true, TRateLimiterMode::Rps>;
+ using TEvDropTableRequest = TGrpcRequestOperationCall<Ydb::Table::DropTableRequest,
+ Ydb::Table::DropTableResponse>;
return SendLocalRpcRequestNoResult<TEvDropTableRequest>(std::move(dropTable), Database, GetTokenCompat());
}
diff --git a/ydb/services/ydb/ydb_table.cpp b/ydb/services/ydb/ydb_table.cpp
index 7189614b8e8..cf136b19bc6 100644
--- a/ydb/services/ydb/ydb_table.cpp
+++ b/ydb/services/ydb/ydb_table.cpp
@@ -1,8 +1,8 @@
#include "ydb_table.h"
+#include <ydb/core/grpc_services/service_table.h>
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/base/base.h>
namespace NKikimr {
namespace NGRpcService {
@@ -33,90 +33,64 @@ void TGRpcYdbTableService::DecRequest() {
void TGRpcYdbTableService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
-#ifdef ADD_REQUEST
-#error ADD_REQUEST macro already defined
+#ifdef ADD_REQUEST_LIMIT
+#error ADD_REQUEST_LIMIT macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>>(this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("table", #NAME))->Run();
-
-#define ADD_BYTES_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>>(this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("table", #NAME))->Run();
-
- ADD_REQUEST(CreateSession, CreateSessionRequest, CreateSessionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateSessionRequest(ctx));
- })
- ADD_REQUEST(DeleteSession, DeleteSessionRequest, DeleteSessionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDeleteSessionRequest(ctx));
- })
- ADD_REQUEST(KeepAlive, KeepAliveRequest, KeepAliveResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvKeepAliveRequest(ctx));
- })
- ADD_REQUEST(AlterTable, AlterTableRequest, AlterTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvAlterTableRequest(ctx));
- })
- ADD_REQUEST(CreateTable, CreateTableRequest, CreateTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvCreateTableRequest(ctx));
- })
- ADD_REQUEST(DropTable, DropTableRequest, DropTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDropTableRequest(ctx));
- })
- ADD_BYTES_REQUEST(StreamReadTable, ReadTableRequest, ReadTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvReadTableRequest(ctx));
- })
- ADD_REQUEST(DescribeTable, DescribeTableRequest, DescribeTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTableRequest(ctx));
- })
- ADD_REQUEST(CopyTable, CopyTableRequest, CopyTableResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvCopyTableRequest(ctx));
- })
- ADD_REQUEST(CopyTables, CopyTablesRequest, CopyTablesResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvCopyTablesRequest(ctx));
- })
- ADD_REQUEST(RenameTables, RenameTablesRequest, RenameTablesResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvRenameTablesRequest(ctx));
- })
- ADD_REQUEST(ExplainDataQuery, ExplainDataQueryRequest, ExplainDataQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvExplainDataQueryRequest(ctx));
- })
- ADD_REQUEST(PrepareDataQuery, PrepareDataQueryRequest, PrepareDataQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvPrepareDataQueryRequest(ctx));
- })
- ADD_REQUEST(ExecuteDataQuery, ExecuteDataQueryRequest, ExecuteDataQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvExecuteDataQueryRequest(ctx));
- })
- ADD_REQUEST(ExecuteSchemeQuery, ExecuteSchemeQueryRequest, ExecuteSchemeQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvExecuteSchemeQueryRequest(ctx));
- })
- ADD_REQUEST(BeginTransaction, BeginTransactionRequest, BeginTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvBeginTransactionRequest(ctx));
- })
- ADD_REQUEST(CommitTransaction, CommitTransactionRequest, CommitTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvCommitTransactionRequest(ctx));
- })
- ADD_REQUEST(RollbackTransaction, RollbackTransactionRequest, RollbackTransactionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvRollbackTransactionRequest(ctx));
- })
- ADD_REQUEST(DescribeTableOptions, DescribeTableOptionsRequest, DescribeTableOptionsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDescribeTableOptionsRequest(ctx));
- })
- ADD_REQUEST(BulkUpsert, BulkUpsertRequest, BulkUpsertResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvBulkUpsertRequest(ctx));
- })
- ADD_REQUEST(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvStreamExecuteScanQueryRequest(ctx));
- })
-
-#undef ADD_REQUEST
+
+#ifdef ADD_STREAM_REQUEST_LIMIT
+#error ADD_STREAM_REQUEST_LIMIT macro already defined
+#endif
+
+#define ADD_REQUEST_LIMIT(NAME, CB, LIMIT_TYPE) \
+ MakeIntrusive<TGRpcRequest<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response, TGRpcYdbTableService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::Table::NAME##Request, Ydb::Table::NAME##Response> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("scheme", #NAME))->Run();
+
+#define ADD_STREAM_REQUEST_LIMIT(NAME, IN, OUT, CB, LIMIT_TYPE) \
+ MakeIntrusive<TGRpcRequest<Ydb::Table::IN, Ydb::Table::OUT, TGRpcYdbTableService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestNoOperationCall<Ydb::Table::IN, Ydb::Table::OUT> \
+ (ctx, &CB, TRequestAuxSettings{TRateLimiterMode::LIMIT_TYPE, nullptr})); \
+ }, &Ydb::Table::V1::TableService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("scheme", #NAME))->Run();
+
+ ADD_REQUEST_LIMIT(CreateSession, DoCreateSessionRequest, Rps)
+ ADD_REQUEST_LIMIT(KeepAlive, DoKeepAliveRequest, Rps)
+ ADD_REQUEST_LIMIT(AlterTable, DoAlterTableRequest, Rps)
+ ADD_REQUEST_LIMIT(CreateTable, DoCreateTableRequest, Rps)
+ ADD_REQUEST_LIMIT(DropTable, DoDropTableRequest, Rps)
+ ADD_REQUEST_LIMIT(DescribeTable, DoDescribeTableRequest, Rps)
+ ADD_REQUEST_LIMIT(CopyTable, DoCopyTableRequest, Rps)
+ ADD_REQUEST_LIMIT(CopyTables, DoCopyTablesRequest, Rps)
+ ADD_REQUEST_LIMIT(RenameTables, DoRenameTablesRequest, Rps)
+ ADD_REQUEST_LIMIT(ExplainDataQuery, DoExplainDataQueryRequest, Rps)
+ ADD_REQUEST_LIMIT(ExecuteSchemeQuery, DoExecuteSchemeQueryRequest, Rps)
+ ADD_REQUEST_LIMIT(BeginTransaction, DoBeginTransactionRequest, Rps)
+ ADD_REQUEST_LIMIT(DescribeTableOptions, DoDescribeTableOptionsRequest, Rps)
+
+ ADD_REQUEST_LIMIT(DeleteSession, DoDeleteSessionRequest, Off)
+ ADD_REQUEST_LIMIT(CommitTransaction, DoCommitTransactionRequest, Off)
+ ADD_REQUEST_LIMIT(RollbackTransaction, DoRollbackTransactionRequest, Off)
+
+
+ ADD_REQUEST_LIMIT(PrepareDataQuery, DoPrepareDataQueryRequest, Ru)
+ ADD_REQUEST_LIMIT(ExecuteDataQuery, DoExecuteDataQueryRequest, Ru)
+ ADD_REQUEST_LIMIT(BulkUpsert, DoBulkUpsertRequest, Ru)
+
+ ADD_STREAM_REQUEST_LIMIT(StreamExecuteScanQuery, ExecuteScanQueryRequest, ExecuteScanQueryPartialResponse, DoExecuteScanQueryRequest, RuOnProgress)
+ ADD_STREAM_REQUEST_LIMIT(StreamReadTable, ReadTableRequest, ReadTableResponse, DoReadTableRequest, RuOnProgress)
+
+#undef ADD_REQUEST_LIMIT
+#undef ADD_STREAM_REQUEST_LIMIT
}
} // namespace NGRpcService