diff options
author | alexnick <alexnick@yandex-team.ru> | 2022-06-06 14:41:13 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-06-06 14:41:13 +0300 |
commit | a2c8f380bc23c5c6b73d8049951b7e6f37cac904 (patch) | |
tree | 54a10681713a4159c437dd8123f2c577a1c01971 | |
parent | 349643054488818105f9d79204fd40988347d340 (diff) | |
download | ydb-a2c8f380bc23c5c6b73d8049951b7e6f37cac904.tar.gz |
KIKIMR-14665 inside ydb use only local rpc calls
KIKIMR-14665 inside ydb use only local rpc calls
REVIEW: 2485498
Invert grpc_request_proxy dependence for datastream service. KIKIMR-13646
REVIEW: 2396725
REVIEW: 2488385
x-ydb-stable-ref: f5aed4ff2239e96f0fcd0bb4a4e060858d680ad8
29 files changed, 928 insertions, 1820 deletions
diff --git a/ydb/core/base/ticket_parser.h b/ydb/core/base/ticket_parser.h index 1514c74a05..5ebde54b6c 100644 --- a/ydb/core/base/ticket_parser.h +++ b/ydb/core/base/ticket_parser.h @@ -38,6 +38,18 @@ namespace NKikimr { // only one of them will be processed. Which one is not guaranteed const std::vector<TEntry> Entries; + struct TAccessKeySignature { + TString AccessKeyId; + TString StringToSign; + TString Signature; + + TInstant SignedAt; + TString Service; + TString Region; + }; + + const TAccessKeySignature Signature; + struct TInitializationFields { TString Database; TString Ticket; @@ -83,6 +95,14 @@ namespace NKikimr { , PeerName(peerName) , Entries(entries) {} + + TEvAuthorizeTicket(TAccessKeySignature&& sign, const TString& peerName, const TVector<TEntry>& entries) + : Ticket("") + , PeerName(peerName) + , Entries(entries) + , Signature(std::move(sign)) + {} + }; struct TError { @@ -162,3 +182,14 @@ template <> inline void Out<NKikimr::TEvTicketParser::TError>(IOutputStream& str, const NKikimr::TEvTicketParser::TError& error) { str << error.Message; } + +namespace NKikimr { +namespace NGRpcService { + +class ICheckerIface { +public: + virtual void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries) = 0; +}; + +} +} diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 44b25c4a5f..5c5d59dc8b 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -1024,20 +1024,65 @@ private: TMaybe<NRpcService::TRlPath> RlPath; }; +template<ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived> +class TGRpcRequestValidationWrapperImpl : + public TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived> { + +public: + static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg); + TGRpcRequestValidationWrapperImpl(NGrpc::IRequestContextBase* ctx) + : TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived>(ctx) + { } + + bool Validate(TString& error) override { + return this->GetProtoRequest()->validate(error); + } +}; + +// SFINAE +// Check protobuf has validate feature +template<typename TProto> +struct TProtoHasValidate { +private: + static int Detect(...); + // validate function has prototype: bool validate(TProtoStringType&) const + static TProtoStringType Dummy_; + template<typename U> + static decltype(std::declval<U>().validate(Dummy_)) Detect(const U&); +public: + static constexpr bool Value = std::is_same<bool, decltype(Detect(std::declval<TProto>()))>::value; +}; + class IFacilityProvider; template <typename TReq, typename TResp, bool IsOperation> class TGrpcRequestCall - : public TGRpcRequestWrapperImpl< - TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>> { + : public std::conditional_t<TProtoHasValidate<TReq>::Value, + TGRpcRequestValidationWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>, + TGRpcRequestWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>> + { typedef typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestIface; public: static constexpr bool IsOp = IsOperation; + static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg); + using TBase = std::conditional_t<TProtoHasValidate<TReq>::Value, + TGRpcRequestValidationWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>, + TGRpcRequestWrapperImpl< + TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>>; + TGrpcRequestCall(NGrpc::IRequestContextBase* ctx, void (*cb)(std::unique_ptr<TRequestIface>, const IFacilityProvider&), TRequestAuxSettings auxSettings = {}) - : TGRpcRequestWrapperImpl< - TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, - TGrpcRequestCall<TReq, TResp, IsOperation>>(ctx) + : TBase(ctx) + , PassMethod(cb) + , AuxSettings(std::move(auxSettings)) + { } + + TGrpcRequestCall(NGrpc::IRequestContextBase* ctx, + std::function<void(std::unique_ptr<TRequestIface>, const IFacilityProvider&)> cb, TRequestAuxSettings auxSettings = {}) + : TBase(ctx) , PassMethod(cb) , AuxSettings(std::move(auxSettings)) { } @@ -1061,7 +1106,7 @@ public: } } private: - void (*PassMethod)(std::unique_ptr<TRequestIface>, const IFacilityProvider&); + std::function<void(std::unique_ptr<TRequestIface>, const IFacilityProvider&)> PassMethod; const TRequestAuxSettings AuxSettings; }; diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h index 6c4be43394..9ee1b7a3f3 100644 --- a/ydb/core/grpc_services/grpc_request_check_actor.h +++ b/ydb/core/grpc_services/grpc_request_check_actor.h @@ -18,11 +18,6 @@ namespace NKikimr { namespace NGRpcService { -class ICheckerIface { -public: - void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries); -}; - template <typename TEvent> class TGrpcRequestCheckActor : public TActorBootstrappedSecureRequest<TGrpcRequestCheckActor<TEvent>> @@ -72,8 +67,6 @@ public: TBase::SetEntries(entries); } - void InitializeAttributesFromYandexQuery(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - void InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); TGrpcRequestCheckActor( @@ -424,71 +417,6 @@ void TGrpcRequestCheckActor<TEvent>::InitializeAttributes(const TSchemeBoardEven InitializeAttributesFromSchema(schemeData); } -// yq behavior -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData); - - // default permissions template <typename TEvent> const TVector<TString>& TGrpcRequestCheckActor<TEvent>::GetPermissions() { @@ -500,85 +428,6 @@ const TVector<TString>& TGrpcRequestCheckActor<TEvent>::GetPermissions() { return permissions; } -// yds behavior -template <> -inline const TVector<TString>& TGrpcRequestCheckActor<TEvDataStreamsPutRecordRequest>::GetPermissions() { - //full list of permissions for compatility. remove old permissions later. - static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list", "ydb.databases.create", "ydb.databases.connect"}; - return permissions; -} -// yds behavior -template <> -inline const TVector<TString>& TGrpcRequestCheckActor<TEvDataStreamsPutRecordsRequest>::GetPermissions() { - //full list of permissions for compatility. remove old permissions later. - static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list", "ydb.databases.create", "ydb.databases.connect"}; - return permissions; -} - -// yq behavior -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::GetPermissions(); - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::GetPermissions(); - template <typename TEvent> IActor* CreateGrpcRequestCheckActor( const TActorId& owner, diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index fd8c78ab94..c44c7f8f6d 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -640,58 +640,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo HFunc(TEvLongTxRollbackRequest, PreHandle); HFunc(TEvLongTxWriteRequest, PreHandle); HFunc(TEvLongTxReadRequest, PreHandle); - HFunc(TEvDataStreamsCreateStreamRequest, PreHandle); - HFunc(TEvDataStreamsDeleteStreamRequest, PreHandle); - HFunc(TEvDataStreamsDescribeStreamRequest, PreHandle); - HFunc(TEvDataStreamsRegisterStreamConsumerRequest, PreHandle); - HFunc(TEvDataStreamsDeregisterStreamConsumerRequest, PreHandle); - HFunc(TEvDataStreamsDescribeStreamConsumerRequest, PreHandle); - HFunc(TEvDataStreamsPutRecordRequest, PreHandle); - HFunc(TEvDataStreamsListStreamsRequest, PreHandle); - HFunc(TEvDataStreamsListShardsRequest, PreHandle); - HFunc(TEvDataStreamsPutRecordsRequest, PreHandle); - HFunc(TEvDataStreamsGetRecordsRequest, PreHandle); - HFunc(TEvDataStreamsGetShardIteratorRequest, PreHandle); - HFunc(TEvDataStreamsSubscribeToShardRequest, PreHandle); - HFunc(TEvDataStreamsDescribeLimitsRequest, PreHandle); - HFunc(TEvDataStreamsDescribeStreamSummaryRequest, PreHandle); - HFunc(TEvDataStreamsDecreaseStreamRetentionPeriodRequest, PreHandle); - HFunc(TEvDataStreamsIncreaseStreamRetentionPeriodRequest, PreHandle); - HFunc(TEvDataStreamsUpdateShardCountRequest, PreHandle); - HFunc(TEvDataStreamsUpdateStreamRequest, PreHandle); - HFunc(TEvDataStreamsSetWriteQuotaRequest, PreHandle); - HFunc(TEvDataStreamsListStreamConsumersRequest, PreHandle); - HFunc(TEvDataStreamsAddTagsToStreamRequest, PreHandle); - HFunc(TEvDataStreamsDisableEnhancedMonitoringRequest, PreHandle); - HFunc(TEvDataStreamsEnableEnhancedMonitoringRequest, PreHandle); - HFunc(TEvDataStreamsListTagsForStreamRequest, PreHandle); - HFunc(TEvDataStreamsMergeShardsRequest, PreHandle); - HFunc(TEvDataStreamsRemoveTagsFromStreamRequest, PreHandle); - HFunc(TEvDataStreamsSplitShardRequest, PreHandle); - HFunc(TEvDataStreamsStartStreamEncryptionRequest, PreHandle); - HFunc(TEvDataStreamsStopStreamEncryptionRequest, PreHandle); - HFunc(TEvYandexQueryCreateQueryRequest, PreHandle); - HFunc(TEvYandexQueryListQueriesRequest, PreHandle); - HFunc(TEvYandexQueryDescribeQueryRequest, PreHandle); - HFunc(TEvYandexQueryGetQueryStatusRequest, PreHandle); - HFunc(TEvYandexQueryModifyQueryRequest, PreHandle); - HFunc(TEvYandexQueryDeleteQueryRequest, PreHandle); - HFunc(TEvYandexQueryControlQueryRequest, PreHandle); - HFunc(TEvYandexQueryGetResultDataRequest, PreHandle); - HFunc(TEvYandexQueryListJobsRequest, PreHandle); - HFunc(TEvYandexQueryDescribeJobRequest, PreHandle); - HFunc(TEvYandexQueryCreateConnectionRequest, PreHandle); - HFunc(TEvYandexQueryListConnectionsRequest, PreHandle); - HFunc(TEvYandexQueryDescribeConnectionRequest, PreHandle); - HFunc(TEvYandexQueryModifyConnectionRequest, PreHandle); - HFunc(TEvYandexQueryDeleteConnectionRequest, PreHandle); - HFunc(TEvYandexQueryTestConnectionRequest, PreHandle); - HFunc(TEvYandexQueryCreateBindingRequest, PreHandle); - HFunc(TEvYandexQueryListBindingsRequest, PreHandle); - HFunc(TEvYandexQueryDescribeBindingRequest, PreHandle); - HFunc(TEvYandexQueryModifyBindingRequest, PreHandle); - HFunc(TEvYandexQueryDeleteBindingRequest, PreHandle); - HFunc(TEvProxyRuntimeEvent, PreHandle); default: diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h index d1d1981cfe..78dedf49d2 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.h +++ b/ydb/core/grpc_services/grpc_request_proxy.h @@ -121,57 +121,6 @@ protected: void Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsSetWriteQuotaRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryCreateQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryListQueriesRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDescribeQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryGetQueryStatusRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryModifyQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDeleteQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryControlQueryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryGetResultDataRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryListJobsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDescribeJobRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryCreateConnectionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryListConnectionsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDescribeConnectionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryModifyConnectionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDeleteConnectionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryTestConnectionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryCreateBindingRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryListBindingsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDescribeBindingRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryModifyBindingRequest::TPtr& ev, const TActorContext& ctx); - void Handle(TEvYandexQueryDeleteBindingRequest::TPtr& ev, const TActorContext& ctx); TActorId DiscoveryCacheActorID; }; diff --git a/ydb/core/grpc_services/rpc_analytics_internal.cpp b/ydb/core/grpc_services/rpc_analytics_internal.cpp index ae3ac7bc59..48227a57fd 100644 --- a/ydb/core/grpc_services/rpc_analytics_internal.cpp +++ b/ydb/core/grpc_services/rpc_analytics_internal.cpp @@ -26,10 +26,7 @@ namespace { template <typename TEv, typename TReq> void SendResponse(const TEv& ev, TReq& req) { if (!ev->Get()->Record) { - for (const auto& x : ev->Get()->Issues) { - req.RaiseIssue(x); - } - + req.RaiseIssues(ev->Get()->Issues); req.ReplyWithYdbStatus(ev->Get()->Status); } else { req.SendResult(*ev->Get()->Record, ev->Get()->Status); diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h index c1b396ae05..e866c9023a 100644 --- a/ydb/core/grpc_services/rpc_calls.h +++ b/ydb/core/grpc_services/rpc_calls.h @@ -24,7 +24,6 @@ #include <ydb/public/api/grpc/draft/dummy.pb.h> #include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h> -#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.pb.h> #include <ydb/public/lib/operation_id/operation_id.h> @@ -119,57 +118,6 @@ using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit, using TEvLongTxRollbackRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRollback, Ydb::LongTx::RollbackTransactionRequest, Ydb::LongTx::RollbackTransactionResponse, true>; using TEvLongTxWriteRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxWrite, Ydb::LongTx::WriteRequest, Ydb::LongTx::WriteResponse, true>; using TEvLongTxReadRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRead, Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, true>; -using TEvDataStreamsCreateStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsCreateStream, Ydb::DataStreams::V1::CreateStreamRequest, Ydb::DataStreams::V1::CreateStreamResponse, true>; -using TEvDataStreamsDeleteStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDeleteStream, Ydb::DataStreams::V1::DeleteStreamRequest, Ydb::DataStreams::V1::DeleteStreamResponse, true>; -using TEvDataStreamsDescribeStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStream, Ydb::DataStreams::V1::DescribeStreamRequest, Ydb::DataStreams::V1::DescribeStreamResponse, true>; -using TEvDataStreamsRegisterStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsRegisterStreamConsumer, Ydb::DataStreams::V1::RegisterStreamConsumerRequest, Ydb::DataStreams::V1::RegisterStreamConsumerResponse, true>; -using TEvDataStreamsDeregisterStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDeregisterStreamConsumer, Ydb::DataStreams::V1::DeregisterStreamConsumerRequest, Ydb::DataStreams::V1::DeregisterStreamConsumerResponse, true>; -using TEvDataStreamsDescribeStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStreamConsumer, Ydb::DataStreams::V1::DescribeStreamConsumerRequest, Ydb::DataStreams::V1::DescribeStreamConsumerResponse, true>; -using TEvDataStreamsPutRecordRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsPutRecord, Ydb::DataStreams::V1::PutRecordRequest, Ydb::DataStreams::V1::PutRecordResponse, true>; -using TEvDataStreamsListStreamsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListStreams, Ydb::DataStreams::V1::ListStreamsRequest, Ydb::DataStreams::V1::ListStreamsResponse, true>; -using TEvDataStreamsListShardsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListShards, Ydb::DataStreams::V1::ListShardsRequest, Ydb::DataStreams::V1::ListShardsResponse, true>; -using TEvDataStreamsPutRecordsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsPutRecords, Ydb::DataStreams::V1::PutRecordsRequest, Ydb::DataStreams::V1::PutRecordsResponse, true>; -using TEvDataStreamsGetRecordsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsGetRecords, Ydb::DataStreams::V1::GetRecordsRequest, Ydb::DataStreams::V1::GetRecordsResponse, true>; -using TEvDataStreamsGetShardIteratorRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsGetShardIterator, Ydb::DataStreams::V1::GetShardIteratorRequest, Ydb::DataStreams::V1::GetShardIteratorResponse, true>; -using TEvDataStreamsSubscribeToShardRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSubscribeToShard, Ydb::DataStreams::V1::SubscribeToShardRequest, Ydb::DataStreams::V1::SubscribeToShardResponse, true>; -using TEvDataStreamsDescribeLimitsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeLimits, Ydb::DataStreams::V1::DescribeLimitsRequest, Ydb::DataStreams::V1::DescribeLimitsResponse, true>; -using TEvDataStreamsDescribeStreamSummaryRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStreamSummary, Ydb::DataStreams::V1::DescribeStreamSummaryRequest, Ydb::DataStreams::V1::DescribeStreamSummaryResponse, true>; -using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDecreaseStreamRetentionPeriod, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse, true>; -using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsIncreaseStreamRetentionPeriod, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse, true>; -using TEvDataStreamsUpdateShardCountRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsUpdateShardCount, Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse, true>; -using TEvDataStreamsUpdateStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsUpdateStream, Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse, true>; -using TEvDataStreamsSetWriteQuotaRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSetWriteQuota, Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse, true>; -using TEvDataStreamsListStreamConsumersRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListStreamConsumers, Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse, true>; -using TEvDataStreamsAddTagsToStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsAddTagsToStream, Ydb::DataStreams::V1::AddTagsToStreamRequest, Ydb::DataStreams::V1::AddTagsToStreamResponse, true>; -using TEvDataStreamsDisableEnhancedMonitoringRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDisableEnhancedMonitoring, Ydb::DataStreams::V1::DisableEnhancedMonitoringRequest, Ydb::DataStreams::V1::DisableEnhancedMonitoringResponse, true>; -using TEvDataStreamsEnableEnhancedMonitoringRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsEnableEnhancedMonitoring, Ydb::DataStreams::V1::EnableEnhancedMonitoringRequest, Ydb::DataStreams::V1::EnableEnhancedMonitoringResponse, true>; -using TEvDataStreamsListTagsForStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListTagsForStream, Ydb::DataStreams::V1::ListTagsForStreamRequest, Ydb::DataStreams::V1::ListTagsForStreamResponse, true>; -using TEvDataStreamsMergeShardsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsMergeShards, Ydb::DataStreams::V1::MergeShardsRequest, Ydb::DataStreams::V1::MergeShardsResponse, true>; -using TEvDataStreamsRemoveTagsFromStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsRemoveTagsFromStream, Ydb::DataStreams::V1::RemoveTagsFromStreamRequest, Ydb::DataStreams::V1::RemoveTagsFromStreamResponse, true>; -using TEvDataStreamsSplitShardRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSplitShard, Ydb::DataStreams::V1::SplitShardRequest, Ydb::DataStreams::V1::SplitShardResponse, true>; -using TEvDataStreamsStartStreamEncryptionRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsStartStreamEncryption, Ydb::DataStreams::V1::StartStreamEncryptionRequest, Ydb::DataStreams::V1::StartStreamEncryptionResponse, true>; -using TEvDataStreamsStopStreamEncryptionRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsStopStreamEncryption, Ydb::DataStreams::V1::StopStreamEncryptionRequest, Ydb::DataStreams::V1::StopStreamEncryptionResponse, true>; -using TEvYandexQueryCreateQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateQuery, YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse, true>; -using TEvYandexQueryListQueriesRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListQueries, YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse, true>; -using TEvYandexQueryDescribeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeQuery, YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse, true>; -using TEvYandexQueryGetQueryStatusRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryGetQueryStatus, YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse, true>; -using TEvYandexQueryModifyQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyQuery, YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse, true>; -using TEvYandexQueryDeleteQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteQuery, YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse, true>; -using TEvYandexQueryControlQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryControlQuery, YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse, true>; -using TEvYandexQueryGetResultDataRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryGetResultData, YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse, true>; -using TEvYandexQueryListJobsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListJobs, YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse, true>; -using TEvYandexQueryDescribeJobRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeJob, YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse, true>; -using TEvYandexQueryCreateConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateConnection, YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse, true>; -using TEvYandexQueryListConnectionsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListConnections, YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse, true>; -using TEvYandexQueryDescribeConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeConnection, YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse, true>; -using TEvYandexQueryModifyConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyConnection, YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse, true>; -using TEvYandexQueryDeleteConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteConnection, YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse, true>; -using TEvYandexQueryTestConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryTestConnection, YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse, true>; -using TEvYandexQueryCreateBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateBinding, YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse, true>; -using TEvYandexQueryListBindingsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListBindings, YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse, true>; -using TEvYandexQueryDescribeBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeBinding, YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse, true>; -using TEvYandexQueryModifyBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyBinding, YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse, true>; -using TEvYandexQueryDeleteBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteBinding, YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse, true>; } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_yq.cpp b/ydb/core/grpc_services/rpc_yq.cpp index 6e832ef5c7..d515056c3e 100644 --- a/ydb/core/grpc_services/rpc_yq.cpp +++ b/ydb/core/grpc_services/rpc_yq.cpp @@ -1,10 +1,7 @@ -#include "grpc_request_proxy.h" -#include "grpc_request_check_actor.h" - -#include "rpc_calls.h" #include "rpc_common.h" #include "rpc_deferrable.h" +#include <ydb/core/grpc_services/service_yq.h> #include <ydb/core/yq/libs/audit/events/events.h> #include <ydb/core/yq/libs/audit/yq_audit_service.h> #include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> @@ -23,11 +20,21 @@ namespace NGRpcService { using namespace Ydb; -template<class TEvRequest, class TEvImplRequest, class TEvImplResponse, class TResult> -class TYandexQueryRPC : public TRpcOperationRequestActor<TYandexQueryRPC<TEvRequest, TEvImplRequest, TEvImplResponse, TResult>, TEvRequest> { -private: - const TVector<TString>& Sids; +template <typename RpcRequestType, typename EvRequestType, typename EvResponseType> +class TYandexQueryRequestRPC : public TRpcOperationRequestActor< + TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> { + +public: + using TBase = TRpcOperationRequestActor< + TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, + RpcRequestType>; + using TBase::Become; + using TBase::Send; + using TBase::PassAway; + using TBase::Request_; + using TBase::GetProtoRequest; +private: TString Token; TString FolderId; TString User; @@ -36,68 +43,58 @@ private: TString RequestId; public: - using TBase = TRpcOperationRequestActor<TYandexQueryRPC<TEvRequest, TEvImplRequest, TEvImplResponse, TResult>, TEvRequest>; - using TBase::TBase; - using TBase::GetProtoRequest; - using TBase::Request_; - using TBase::PassAway; - using TBase::Send; - using TBase::Become; - - TYandexQueryRPC(IRequestOpCtx* request, const TVector<TString>& sids) - : TBase(request) - , Sids(sids) - { } + TYandexQueryRequestRPC(IRequestOpCtx* request) + : TBase(request) {} void Bootstrap() { - auto requestQuery = dynamic_cast<TEvRequest*>(Request_.get()); - Y_VERIFY(requestQuery); + auto requestCtx = Request_.get(); + + auto request = dynamic_cast<RpcRequestType*>(requestCtx); + Y_VERIFY(request); + + auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx); + Y_VERIFY(proxyCtx); - PeerName = requestQuery->GetPeerName(); - UserAgent = requestQuery->GetPeerMetaValues("user-agent").GetOrElse("empty"); - RequestId = requestQuery->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString()); + PeerName = Request_->GetPeerName(); + UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty"); + RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString()); - TMaybe<TString> authToken = requestQuery->GetYdbToken(); + TMaybe<TString> authToken = proxyCtx->GetYdbToken(); if (!authToken) { - Request_->RaiseIssue(NYql::TIssue("token is empty")); - ReplyWithResult(StatusIds::BAD_REQUEST); + ReplyWithStatus("token is empty", StatusIds::BAD_REQUEST); return; } Token = *authToken; - const TString scope = requestQuery->GetPeerMetaValues("x-yq-scope").GetOrElse(""); + const TString scope = Request_->GetPeerMetaValues("x-yq-scope").GetOrElse(""); if (!scope.StartsWith("yandexcloud://")) { - Request_->RaiseIssue(NYql::TIssue("x-yq-scope should start with yandexcloud:// but got " + scope)); - ReplyWithResult(StatusIds::BAD_REQUEST); + ReplyWithStatus("x-yq-scope should start with yandexcloud:// but got " + scope, StatusIds::BAD_REQUEST); return; } const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); - if (path.size() != 2 && path.size() != 3) { // todo: do not check against 3, backward compatibility - Request_->RaiseIssue(NYql::TIssue("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope)); - ReplyWithResult(StatusIds::BAD_REQUEST); + if (path.size() != 2) { + ReplyWithStatus("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope, StatusIds::BAD_REQUEST); return; } FolderId = path.back(); if (!FolderId) { - Request_->RaiseIssue(NYql::TIssue("folder id is empty")); - ReplyWithResult(StatusIds::BAD_REQUEST); + ReplyWithStatus("folder id is empty", StatusIds::BAD_REQUEST); return; } if (FolderId.length() > 1024) { - Request_->RaiseIssue(NYql::TIssue("folder id length greater than 1024 characters: " + FolderId)); - ReplyWithResult(StatusIds::BAD_REQUEST); + ReplyWithStatus("folder id length greater than 1024 characters: " + FolderId, StatusIds::BAD_REQUEST); return; } - const TString& internalToken = requestQuery->GetInternalToken(); + const TString& internalToken = proxyCtx->GetInternalToken(); TVector<TString> permissions; if (internalToken) { NACLib::TUserToken userToken(internalToken); User = userToken.GetUserSID(); - for (const auto& sid: Sids) { + for (const auto& sid: request->Sids) { if (userToken.IsExist(sid)) { permissions.push_back(sid); } @@ -105,40 +102,45 @@ public: } if (!User) { - Request_->RaiseIssue(NYql::TIssue("Authorization error. Permission denied")); - ReplyWithResult(StatusIds::UNAUTHORIZED); + ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED); return; } - auto request = std::make_unique<TEvImplRequest>(FolderId, *GetProtoRequest(), User, Token, permissions); - Send(NYq::ControlPlaneProxyActorId(), request.release()); - - Become(&TYandexQueryRPC::StateFunc); + const auto req = GetProtoRequest(); + auto ev = MakeHolder<EvRequestType>(FolderId, *req, User, Token, permissions); + Send(NYq::ControlPlaneProxyActorId(), ev.Release()); + Become(&TYandexQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc); } + void ReplyWithStatus(const TString& issueMessage, StatusIds::StatusCode status) { + Request_->RaiseIssue(NYql::TIssue(issueMessage)); + Request_->ReplyWithYdbStatus(status); + PassAway(); + } + private: STRICT_STFUNC(StateFunc, - hFunc(TEvImplResponse, Handler); + hFunc(EvResponseType, Handle); ) - template<typename T> - void ProcessResponse(const T& response) { + template <typename TResponse, typename TReq> + void SendResponse(const TResponse& response, TReq& req) { if (response.Issues) { - Request_->RaiseIssues(response.Issues); - ReplyWithResult(StatusIds::BAD_REQUEST); + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); } else { - ReplyWithResult(StatusIds::SUCCESS, response.Result); - } + req.SendResult(response.Result, StatusIds::SUCCESS); + } } - template<typename T> requires requires (T t) { t.AuditDetails; } - void ProcessResponse(const T& response) { + template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; } + void SendResponse(const TResponse& response, TReq& req) { if (response.Issues) { - Request_->RaiseIssues(response.Issues); - ReplyWithResult(StatusIds::BAD_REQUEST); + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); } else { - ReplyWithResult(StatusIds::SUCCESS, response.Result); - } + req.SendResult(response.Result, StatusIds::SUCCESS); + } NYq::TEvAuditService::TExtraInfo extraInfo{ .Token = Token, @@ -156,719 +158,199 @@ private: response.AuditDetails)); } - void Handler(typename TEvImplResponse::TPtr& ev) { - const auto& response = *ev->Get(); - ProcessResponse(response); - } - - void ReplyWithResult(StatusIds::StatusCode status) { - Request_->ReplyWithYdbStatus(status); - PassAway(); - } - - void ReplyWithResult(StatusIds::StatusCode status, const TResult& result) { - Request_->SendResult(result, status); + void Handle(typename EvResponseType::TPtr& ev) { + SendResponse(*ev->Get(), *Request_); PassAway(); } }; -void TGRpcRequestProxy::Handle(TEvYandexQueryCreateQueryRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.create@as", - "yq.queries.invoke@as", - "yq.connections.use@as", - "yq.bindings.use@as", - "yq.resources.managePublic@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateQueryRequest, - NYq::TEvControlPlaneProxy::TEvCreateQueryRequest, - NYq::TEvControlPlaneProxy::TEvCreateQueryResponse, - YandexQuery::CreateQueryResult>(ev->Release().Release(), permissions)); -} - -void TGRpcRequestProxy::Handle(TEvYandexQueryListQueriesRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryListQueriesRequest, - NYq::TEvControlPlaneProxy::TEvListQueriesRequest, - NYq::TEvControlPlaneProxy::TEvListQueriesResponse, - YandexQuery::ListQueriesResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryCreateQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse>, + NYq::TEvControlPlaneProxy::TEvCreateQueryRequest, + NYq::TEvControlPlaneProxy::TEvCreateQueryResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeQueryRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.get@as", - "yq.queries.viewAst@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeQueryRequest, - NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest, - NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse, - YandexQuery::DescribeQueryResult>(ev->Release().Release(), permissions)); +void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateQueryRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryGetQueryStatusRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.getStatus@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryGetQueryStatusRequest, - NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest, - NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse, - YandexQuery::GetQueryStatusResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryListQueriesRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse>, + NYq::TEvControlPlaneProxy::TEvListQueriesRequest, + NYq::TEvControlPlaneProxy::TEvListQueriesResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryModifyQueryRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.update@as", - "yq.queries.invoke@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as", - "yq.connections.use@as", - "yq.bindings.use@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyQueryRequest, - NYq::TEvControlPlaneProxy::TEvModifyQueryRequest, - NYq::TEvControlPlaneProxy::TEvModifyQueryResponse, - YandexQuery::ModifyQueryResult>(ev->Release().Release(), permissions)); +void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListQueriesRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteQueryRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.delete@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteQueryRequest, - NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest, - NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse, - YandexQuery::DeleteQueryResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryDescribeQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest, + NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryControlQueryRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.control@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryControlQueryRequest, - NYq::TEvControlPlaneProxy::TEvControlQueryRequest, - NYq::TEvControlPlaneProxy::TEvControlQueryResponse, - YandexQuery::ControlQueryResult>(ev->Release().Release(), permissions)); +void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeQueryRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryGetResultDataRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.queries.getData@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryGetResultDataRequest, - NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, - NYq::TEvControlPlaneProxy::TEvGetResultDataResponse, - YandexQuery::GetResultDataResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryGetQueryStatusRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse>, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryListJobsRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.jobs.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryListJobsRequest, - NYq::TEvControlPlaneProxy::TEvListJobsRequest, - NYq::TEvControlPlaneProxy::TEvListJobsResponse, - YandexQuery::ListJobsResult>(ev->Release().Release(), permissions)); +void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryGetQueryStatusRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeJobRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.jobs.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeJobRequest, - NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, - NYq::TEvControlPlaneProxy::TEvDescribeJobResponse, - YandexQuery::DescribeJobResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryModifyQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse>, + NYq::TEvControlPlaneProxy::TEvModifyQueryRequest, + NYq::TEvControlPlaneProxy::TEvModifyQueryResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryCreateConnectionRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.create@as", - "yq.resources.managePublic@as", - - // service account permissions - "iam.serviceAccounts.use@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateConnectionRequest, - NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, - NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse, - YandexQuery::CreateConnectionResult>(ev->Release().Release(), permissions)); +void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyQueryRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryListConnectionsRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryListConnectionsRequest, - NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, - NYq::TEvControlPlaneProxy::TEvListConnectionsResponse, - YandexQuery::ListConnectionsResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryDeleteQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest, + NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeConnectionRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse, - YandexQuery::DescribeConnectionResult>(ev->Release().Release(), permissions)); +void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteQueryRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryModifyConnectionRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.update@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as", - - // service account permissions - "iam.serviceAccounts.use@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyConnectionRequest, - NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, - NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse, - YandexQuery::ModifyConnectionResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryControlQueryRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse>, + NYq::TEvControlPlaneProxy::TEvControlQueryRequest, + NYq::TEvControlPlaneProxy::TEvControlQueryResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteConnectionRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.delete@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, - NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse, - YandexQuery::DeleteConnectionResult>(ev->Release().Release(), permissions)); +void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryControlQueryRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryTestConnectionRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.connections.create@as", - - // service account permissions - "iam.serviceAccounts.use@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryTestConnectionRequest, - NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, - NYq::TEvControlPlaneProxy::TEvTestConnectionResponse, - YandexQuery::TestConnectionResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse>, + NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, + NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryCreateBindingRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.bindings.create@as", - "yq.resources.managePublic@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateBindingRequest, - NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, - NYq::TEvControlPlaneProxy::TEvCreateBindingResponse, - YandexQuery::CreateBindingResult>(ev->Release().Release(), permissions)); +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryListBindingsRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.bindings.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryListBindingsRequest, - NYq::TEvControlPlaneProxy::TEvListBindingsRequest, - NYq::TEvControlPlaneProxy::TEvListBindingsResponse, - YandexQuery::ListBindingsResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryListJobsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse>, + NYq::TEvControlPlaneProxy::TEvListJobsRequest, + NYq::TEvControlPlaneProxy::TEvListJobsResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeBindingRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.bindings.get@as", - "yq.resources.viewPublic@as", - "yq.resources.viewPrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeBindingRequest, - NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, - NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse, - YandexQuery::DescribeBindingResult>(ev->Release().Release(), permissions)); +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release())); } -void TGRpcRequestProxy::Handle(TEvYandexQueryModifyBindingRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.bindings.update@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyBindingRequest, - NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, - NYq::TEvControlPlaneProxy::TEvModifyBindingResponse, - YandexQuery::ModifyBindingResult>(ev->Release().Release(), permissions)); -} +using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, + NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>; -void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteBindingRequest::TPtr& ev, const TActorContext& ctx) { - static const TVector<TString> permissions = { - // folder permissions - "yq.bindings.delete@as", - "yq.resources.managePublic@as", - "yq.resources.managePrivate@as" - }; - ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteBindingRequest, - NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, - NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse, - YandexQuery::DeleteBindingResult>(ev->Release().Release(), permissions)); +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release())); } -template<typename T> -TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const T&) -{ - return {}; -} +using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, + NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>; -template<> -TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryCreateConnectionRequest& request) -{ - TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest()); - if (serviceAccountId) { - return {{ - {"iam.serviceAccounts.use"}, - { - {"service_account_id", serviceAccountId}, - {"database_id", "db"} - } - }}; - } - return {}; +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release())); } -template<> -TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryModifyConnectionRequest& request) -{ - TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest()); - if (serviceAccountId) { - return {{ - {"iam.serviceAccounts.use"}, - { - {"service_account_id", serviceAccountId}, - {"database_id", "db"} - } - }}; - } - return {}; -} +using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse>, + NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, + NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>; -template<> -TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryTestConnectionRequest& request) -{ - TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest()); - if (serviceAccountId) { - return {{ - {"iam.serviceAccounts.use"}, - { - {"service_account_id", serviceAccountId}, - {"database_id", "db"} - } - }}; - } - return {}; +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release())); } -template <typename TEvent> -void TGrpcRequestCheckActor<TEvent>::InitializeAttributesFromYandexQuery(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - CheckedDatabaseName_ = CanonizePath(schemeData.GetPath()); - const TString scope = Request_->Get()->GetPeerMetaValues("x-yq-scope").GetOrElse(""); - if (scope.StartsWith("yandexcloud://")) { - const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); - if (path.size() == 2 || path.size() == 3) { - const TString& folderId = path.back(); - TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ - GetPermissions(), - { - {"folder_id", folderId}, - {"database_id", "db"} - } - }}; +using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>; - const auto& request = *Request_->Get(); - const auto additionalEntries = GetAdditionalEntries(request); - entries.insert(entries.end(), additionalEntries.begin(), additionalEntries.end()); - TBase::SetEntries(entries); - } - } +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.create", - "yq.queries.invoke", - "yq.connections.use", - "yq.bindings.use", - "yq.resources.managePublic" - }; - return permissions; -} +using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, + NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>; -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.get", - "yq.queries.viewAst", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} +using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>; -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "yq.queries.getStatus", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.update", - "yq.queries.invoke", - "yq.resources.managePublic", - "yq.resources.managePrivate", - "yq.connections.use", - "yq.bindings.use" - }; - return permissions; -} +using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, + NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>; -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.control", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; -} +using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse>, + NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, + NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>; -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.queries.getData", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { + TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.jobs.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} +using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse>, + NYq::TEvControlPlaneProxy::TEvListBindingsRequest, + NYq::TEvControlPlaneProxy::TEvListBindingsResponse>; -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.jobs.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release())); } -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.connections.create", - "yq.resources.managePublic" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.connections.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.connections.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.connections.update", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.connections.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "yq.connections.create" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.bindings.create", - "yq.resources.managePublic" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.bindings.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.bindings.get", - "yq.resources.viewPublic", - "yq.resources.viewPrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.bindings.update", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; -} - -template <> -const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::GetPermissions() { - static const TVector<TString> permissions = { - "ydb.tables.list", // TODO: delete after enabling permission validation - "yq.bindings.delete", - "yq.resources.managePublic", - "yq.resources.managePrivate" - }; - return permissions; -} +using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, + NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>; -// yq behavior -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release())); } -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} +using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse>, + NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, + NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>; -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release())); } -template <> -void TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} - -template <> -void TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); -} +using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC< + TGrpcYqRequestOperationCall<YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, + NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>; -template <> -void TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) { - InitializeAttributesFromYandexQuery(schemeData); +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release())); } } // namespace NGRpcService diff --git a/ydb/core/grpc_services/service_datastreams.h b/ydb/core/grpc_services/service_datastreams.h new file mode 100644 index 0000000000..7d8488c30e --- /dev/null +++ b/ydb/core/grpc_services/service_datastreams.h @@ -0,0 +1,46 @@ +#pragma once +#include <memory> + +namespace NActors { +struct TActorId; +} + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoDataStreamsCreateStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDeleteStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDescribeStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsPutRecordRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsRegisterStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDeregisterStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDescribeStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsListStreamsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsListShardsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsPutRecordsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsGetRecordsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsGetShardIteratorRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsSubscribeToShardRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDescribeLimitsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDescribeStreamSummaryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsUpdateShardCountRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsListStreamConsumersRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsAddTagsToStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsDisableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsEnableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsListTagsForStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsUpdateStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsSetWriteQuotaRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsMergeShardsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsRemoveTagsFromStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsSplitShardRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsStartStreamEncryptionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); +void DoDataStreamsStopStreamEncryptionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&); + +} +} diff --git a/ydb/core/grpc_services/service_yq.h b/ydb/core/grpc_services/service_yq.h new file mode 100644 index 0000000000..8a9f96cdf2 --- /dev/null +++ b/ydb/core/grpc_services/service_yq.h @@ -0,0 +1,92 @@ +#pragma once + +#include <algorithm> +#include <memory> + +#include <ydb/core/base/ticket_parser.h> +#include <ydb/core/yq/libs/control_plane_proxy/utils.h> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +template <typename TReq, typename TResp> +class TGrpcYqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> { + +public: + using TBase = TGrpcRequestOperationCall<TReq, TResp>; + using TBase::GetProtoRequest; + using TBase::GetPeerMetaValues; + + const TVector<TString>& Permissions; + TVector<TString> Sids; + + TGrpcYqRequestOperationCall(NGrpc::IRequestContextBase* ctx, + void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&), + const TVector<TString>& permissions) + : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) { + } + + bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override { + + const TString scope = GetPeerMetaValues("x-yq-scope").GetOrElse(""); + if (scope.StartsWith("yandexcloud://")) { + const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); + if (path.size() == 2) { + const TString& folderId = path.back(); + TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ + Permissions, + { + {"folder_id", folderId}, + {"database_id", "db"} + } + }}; + std::transform(Permissions.begin(), Permissions.end(), std::back_inserter(Sids), + [](const TString& s) -> TString { return s + "@as"; }); + + auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest()); + if (serviceAccountId) { + entries.push_back({ + {"iam.serviceAccounts.use"}, + { + {"service_account_id", serviceAccountId}, + {"database_id", "db"} + }}); + Sids.push_back("iam.serviceAccounts.use@as"); + } + + iface->SetEntries(entries); + return true; + } + } + + return false; + } +}; + +void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/http_proxy/auth_factory.cpp b/ydb/core/http_proxy/auth_factory.cpp index 3ff3e3055e..03f315cad3 100644 --- a/ydb/core/http_proxy/auth_factory.cpp +++ b/ydb/core/http_proxy/auth_factory.cpp @@ -2,7 +2,6 @@ #include "http_service.h" #include "http_req.h" #include "metrics_actor.h" -#include "driver_cache_actor.h" #include "discovery_actor.h" #include <library/cpp/actors/http/http_proxy.h> @@ -35,12 +34,7 @@ void TAuthFactory::Initialize( const NYdb::TCredentialsProviderPtr credentialsProvider = NYdb::CreateInsecureCredentialsProviderFactory()->CreateProvider(); - IActor* actor = NKikimr::NHttpProxy::CreateStaticDiscoveryActor(grpcPort); - localServices.push_back(std::pair<TActorId, TActorSetupCmd>( - NKikimr::NHttpProxy::MakeTenantDiscoveryID(), - TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); - - actor = NKikimr::NHttpProxy::CreateMetricsActor(NKikimr::NHttpProxy::TMetricsSettings{appData.Counters->GetSubgroup("counters", "http_proxy")}); + auto actor = NKikimr::NHttpProxy::CreateMetricsActor(NKikimr::NHttpProxy::TMetricsSettings{appData.Counters->GetSubgroup("counters", "http_proxy")}); localServices.push_back(std::pair<TActorId, TActorSetupCmd>( NKikimr::NHttpProxy::MakeMetricsServiceID(), TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId))); diff --git a/ydb/core/http_proxy/discovery_actor.cpp b/ydb/core/http_proxy/discovery_actor.cpp index 7e85f4700c..674ed7af79 100644 --- a/ydb/core/http_proxy/discovery_actor.cpp +++ b/ydb/core/http_proxy/discovery_actor.cpp @@ -1,4 +1,3 @@ -#include "driver_cache_actor.h" #include "events.h" #include "discovery_actor.h" diff --git a/ydb/core/http_proxy/driver_cache_actor.cpp b/ydb/core/http_proxy/driver_cache_actor.cpp index 71f4799ace..e69de29bb2 100644 --- a/ydb/core/http_proxy/driver_cache_actor.cpp +++ b/ydb/core/http_proxy/driver_cache_actor.cpp @@ -1,80 +0,0 @@ -#include "driver_cache_actor.h" -#include "events.h" -#include "http_req.h" - -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/actors/core/events.h> -#include <library/cpp/actors/core/hfunc.h> -#include <library/cpp/actors/http/http_proxy.h> -#include <library/cpp/cache/cache.h> - -#include <util/stream/file.h> -#include <util/string/builder.h> -#include <util/string/vector.h> -#include <util/system/hostname.h> - -namespace NKikimr::NHttpProxy { - - using namespace NActors; - - - class TStaticDiscoveryActor : public NActors::TActorBootstrapped<TStaticDiscoveryActor> { - using TBase = NActors::TActorBootstrapped<TStaticDiscoveryActor>; - public: - explicit TStaticDiscoveryActor(ui16 port) - { - Database.Reset(new TDatabase(TStringBuilder() << FQDNHostName() << ":" << port, "database_id", "", "cloud_id", "folder_id")); - } - - void Bootstrap(const TActorContext& ctx) { - TBase::Become(&TStaticDiscoveryActor::StateWork); - Y_UNUSED(ctx); - } - - private: - STFUNC(StateWork) { - switch (ev->GetTypeRewrite()) { - HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest, Handle); - } - } - - THolder<TDatabase> Database; - - TVector<std::pair<TActorId, TString>> Waiters; - - void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest::TPtr& ev, const TActorContext& ctx); - - TStringBuilder LogPrefix() const { - return TStringBuilder(); - } - - void ProcessWaiters(const TActorContext& ctx); - }; - - - void TStaticDiscoveryActor::Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest::TPtr& ev, const TActorContext& ctx) { - Waiters.push_back(std::make_pair(ev->Sender, ev->Get()->DatabasePath)); - if (!Database) { - return; - } - ProcessWaiters(ctx); - } - - void TStaticDiscoveryActor::ProcessWaiters(const TActorContext& ctx) { - Y_VERIFY(Database); - for (auto& waiter : Waiters) { - auto result = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult>(); - auto database = waiter.second; - result->DatabaseInfo = MakeHolder<TDatabase>(*Database); - result->DatabaseInfo->Path = database; - result->Status = NYdb::EStatus::SUCCESS; - ctx.Send(waiter.first, std::move(result)); - } - Waiters.clear(); - } - - NActors::IActor* CreateStaticDiscoveryActor(ui16 port) { - return new TStaticDiscoveryActor{port}; - } - -} diff --git a/ydb/core/http_proxy/driver_cache_actor.h b/ydb/core/http_proxy/driver_cache_actor.h index e162e6c146..e69de29bb2 100644 --- a/ydb/core/http_proxy/driver_cache_actor.h +++ b/ydb/core/http_proxy/driver_cache_actor.h @@ -1,8 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actor.h> - -namespace NKikimr::NHttpProxy { - - NActors::IActor* CreateStaticDiscoveryActor(ui16 port); -} // namespace diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h index 12eaf2df32..cd32e92c34 100644 --- a/ydb/core/http_proxy/events.h +++ b/ydb/core/http_proxy/events.h @@ -117,9 +117,12 @@ namespace NKikimr::NHttpProxy { TString ServiceAccountId; TString IamToken; - TEvToken(const TString& serviceAccountId, const TString& iamToken) + TString SerializedUserToken; + + TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken = "") : ServiceAccountId(serviceAccountId) , IamToken(iamToken) + , SerializedUserToken(serializedUserToken) {} }; diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 6d775c012d..ea0b2f7aa2 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -1,4 +1,3 @@ -#include "driver_cache_actor.h" #include "events.h" #include "http_req.h" #include "auth_factory.h" @@ -6,14 +5,17 @@ #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> #include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h> #include <ydb/core/grpc_caching/cached_grpc_request_actor.h> +#include <ydb/core/grpc_services/local_rpc/local_rpc.h> #include <ydb/core/protos/serverless_proxy_config.pb.h> #include <ydb/services/datastreams/shard_iterator.h> #include <ydb/services/datastreams/next_token.h> +#include <ydb/services/datastreams/datastreams_proxy.h> #include <ydb/core/viewer/json/json.h> #include <ydb/core/base/appdata.h> #include <ydb/library/naming_conventions/naming_conventions.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/library/http_proxy/authorization/auth_helpers.h> #include <ydb/library/http_proxy/error/error.h> @@ -38,7 +40,6 @@ namespace NKikimr::NHttpProxy { - using namespace google::protobuf; using namespace Ydb::DataStreams::V1; using namespace NYdb::NDataStreams::V1; @@ -556,7 +557,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") ); } - template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall> + template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv> class THttpRequestProcessor : public IHttpRequestProcessor { public: enum TRequestState { @@ -630,6 +631,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") } void SendYdbDriverRequest(const TActorContext& ctx) { + Y_VERIFY(HttpContext.Driver); + RequestState = StateAuthorization; auto request = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest>(); @@ -675,7 +678,24 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") "sending grpc request to '" << HttpContext.DiscoveryEndpoint << "' database: '" << HttpContext.DatabaseName << "' iam token size: " << HttpContext.IamToken.size()); - + if (!HttpContext.Driver) { + RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, HttpContext.SerializedUserToken, ctx.ActorSystem()); + RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResponse>& future) { + auto& response = future.GetValueSync(); + auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>(); + Y_VERIFY(response.operation().ready()); + if (response.operation().status() == Ydb::StatusIds::SUCCESS) { + TProtoResult rs; + response.operation().result().UnpackTo(&rs); + result->Message = MakeHolder<TProtoResult>(rs); + } + NYql::TIssues issues; + NYql::IssuesFromMessage(response.operation().issues(), issues); + result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), std::move(issues)); + actorSystem->Send(actorId, result.Release()); + }); + return; + } Y_VERIFY(Client); Y_VERIFY(DiscoveryFuture->HasValue()); @@ -703,7 +723,13 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") void HandleToken(TEvServerlessProxy::TEvToken::TPtr& ev, const TActorContext& ctx) { HttpContext.ServiceAccountId = ev->Get()->ServiceAccountId; HttpContext.IamToken = ev->Get()->IamToken; - SendYdbDriverRequest(ctx); + HttpContext.SerializedUserToken = ev->Get()->SerializedUserToken; + + if (HttpContext.Driver) { + SendYdbDriverRequest(ctx); + } else { + SendGrpcRequest(ctx); + } } void HandleError(TEvServerlessProxy::TEvError::TPtr& ev, const TActorContext& ctx) { @@ -830,8 +856,12 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") "database '" << HttpContext.DatabaseName << "' " << "stream '" << ExtractStreamName<TProtoRequest>(Request) << "'"); - if (HttpContext.IamToken.empty()) { - AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature))); + if (HttpContext.IamToken.empty() || !HttpContext.Driver) { //use Signature or no sdk mode - then need to auth anyway + if (HttpContext.IamToken.empty() && !Signature) { //Test mode - no driver and no creds + SendGrpcRequest(ctx); + } else { + AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature))); + } } else { SendYdbDriverRequest(ctx); } @@ -849,6 +879,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") THttpRequestContext HttpContext; THolder<NKikimr::NSQS::TAwsRequestSignV4> Signature; THolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>> Future; + NThreading::TFuture<TProtoResponse> RpcFuture; THolder<NThreading::TFuture<void>> DiscoveryFuture; TProtoCall ProtoCall; TString Method; @@ -880,60 +911,37 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second") void THttpRequestProcessors::Initialize() { - Name2Processor["PutRecords"] = MakeHolder<THttpRequestProcessor<DataStreamsService, PutRecordsRequest, PutRecordsResponse, PutRecordsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecords)>>("PutRecords", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecords); - Name2Processor["CreateStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, CreateStreamRequest, CreateStreamResponse, CreateStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncCreateStream)>>("CreateStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncCreateStream); - Name2Processor["ListStreams"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListStreamsRequest, ListStreamsResponse, ListStreamsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreams)>>("ListStreams", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreams); - Name2Processor["DeleteStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DeleteStreamRequest, DeleteStreamResponse, DeleteStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeleteStream)>>("DeleteStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeleteStream); - Name2Processor["DescribeStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamRequest, DescribeStreamResponse, DescribeStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStream)>>("DescribeStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStream); - Name2Processor["ListShards"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListShardsRequest, ListShardsResponse, ListShardsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListShards)>>("ListShards", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListShards); - Name2Processor["PutRecord"] = MakeHolder<THttpRequestProcessor<DataStreamsService, PutRecordRequest, PutRecordResponse, PutRecordResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecord)>>("PutRecord", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecord); - Name2Processor["GetRecords"] = MakeHolder<THttpRequestProcessor<DataStreamsService, GetRecordsRequest, GetRecordsResponse, GetRecordsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetRecords)>>("GetRecords", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetRecords); - Name2Processor["GetShardIterator"] = MakeHolder<THttpRequestProcessor<DataStreamsService, GetShardIteratorRequest, GetShardIteratorResponse, GetShardIteratorResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetShardIterator)>>("GetShardIterator", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetShardIterator); - Name2Processor["DescribeLimits"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeLimitsRequest, DescribeLimitsResponse, DescribeLimitsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeLimits)>>("DescribeLimits", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeLimits); - Name2Processor["DescribeStreamSummary"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamSummaryRequest, DescribeStreamSummaryResponse, DescribeStreamSummaryResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamSummary)>>("DescribeStreamSummary", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamSummary); - Name2Processor["DecreaseStreamRetentionPeriod"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DecreaseStreamRetentionPeriodRequest, DecreaseStreamRetentionPeriodResponse, DecreaseStreamRetentionPeriodResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDecreaseStreamRetentionPeriod)>>("DecreaseStreamRetentionPeriod", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDecreaseStreamRetentionPeriod); - Name2Processor["IncreaseStreamRetentionPeriod"] = MakeHolder<THttpRequestProcessor<DataStreamsService, IncreaseStreamRetentionPeriodRequest, IncreaseStreamRetentionPeriodResponse, IncreaseStreamRetentionPeriodResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncIncreaseStreamRetentionPeriod)>>("IncreaseStreamRetentionPeriod", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncIncreaseStreamRetentionPeriod); - Name2Processor["UpdateShardCount"] = MakeHolder<THttpRequestProcessor<DataStreamsService, UpdateShardCountRequest, UpdateShardCountResponse, UpdateShardCountResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateShardCount)>>("UpdateShardCount", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateShardCount); - Name2Processor["RegisterStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, RegisterStreamConsumerRequest, RegisterStreamConsumerResponse, RegisterStreamConsumerResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRegisterStreamConsumer)>>("RegisterStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRegisterStreamConsumer); - Name2Processor["DeregisterStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DeregisterStreamConsumerRequest, DeregisterStreamConsumerResponse, DeregisterStreamConsumerResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeregisterStreamConsumer)>>("DeregisterStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeregisterStreamConsumer); - Name2Processor["DescribeStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamConsumerRequest, DescribeStreamConsumerResponse, DescribeStreamConsumerResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamConsumer)>>("DescribeStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamConsumer); - Name2Processor["ListStreamConsumers"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListStreamConsumersRequest, ListStreamConsumersResponse, ListStreamConsumersResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreamConsumers)>>("ListStreamConsumers", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreamConsumers); - Name2Processor["AddTagsToStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, AddTagsToStreamRequest, AddTagsToStreamResponse, AddTagsToStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncAddTagsToStream)>>("AddTagsToStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncAddTagsToStream); - Name2Processor["DisableEnhancedMonitoring"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DisableEnhancedMonitoringRequest, DisableEnhancedMonitoringResponse, DisableEnhancedMonitoringResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDisableEnhancedMonitoring)>>("DisableEnhancedMonitoring", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDisableEnhancedMonitoring); - Name2Processor["EnableEnhancedMonitoring"] = MakeHolder<THttpRequestProcessor<DataStreamsService, EnableEnhancedMonitoringRequest, EnableEnhancedMonitoringResponse, EnableEnhancedMonitoringResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncEnableEnhancedMonitoring)>>("EnableEnhancedMonitoring", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncEnableEnhancedMonitoring); - Name2Processor["ListTagsForStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListTagsForStreamRequest, ListTagsForStreamResponse, ListTagsForStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListTagsForStream)>>("ListTagsForStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListTagsForStream); - Name2Processor["MergeShards"] = MakeHolder<THttpRequestProcessor<DataStreamsService, MergeShardsRequest, MergeShardsResponse, MergeShardsResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncMergeShards)>>("MergeShards", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncMergeShards); - Name2Processor["RemoveTagsFromStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, RemoveTagsFromStreamRequest, RemoveTagsFromStreamResponse, RemoveTagsFromStreamResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRemoveTagsFromStream)>>("RemoveTagsFromStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRemoveTagsFromStream); - Name2Processor["SplitShard"] = MakeHolder<THttpRequestProcessor<DataStreamsService, SplitShardRequest, SplitShardResponse, SplitShardResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncSplitShard)>>("SplitShard", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncSplitShard); - Name2Processor["StartStreamEncryption"] = MakeHolder<THttpRequestProcessor<DataStreamsService, StartStreamEncryptionRequest, StartStreamEncryptionResponse, StartStreamEncryptionResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStartStreamEncryption)>>("StartStreamEncryption", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStartStreamEncryption); - Name2Processor["StopStreamEncryption"] = MakeHolder<THttpRequestProcessor<DataStreamsService, StopStreamEncryptionRequest, StopStreamEncryptionResponse, StopStreamEncryptionResult, - decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStopStreamEncryption)>>("StopStreamEncryption", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStopStreamEncryption); + #define DECLARE_PROCESSOR(name) Name2Processor[#name] = MakeHolder<THttpRequestProcessor<DataStreamsService, name##Request, name##Response, name##Result,\ + decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), NKikimr::NGRpcService::TEvDataStreams##name##Request>> \ + (#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name); + DECLARE_PROCESSOR(PutRecords); + DECLARE_PROCESSOR(CreateStream); + DECLARE_PROCESSOR(ListStreams); + DECLARE_PROCESSOR(DeleteStream); + DECLARE_PROCESSOR(DescribeStream); + DECLARE_PROCESSOR(ListShards); + DECLARE_PROCESSOR(PutRecord); + DECLARE_PROCESSOR(GetRecords); + DECLARE_PROCESSOR(GetShardIterator); + DECLARE_PROCESSOR(DescribeLimits); + DECLARE_PROCESSOR(DescribeStreamSummary); + DECLARE_PROCESSOR(DecreaseStreamRetentionPeriod); + DECLARE_PROCESSOR(IncreaseStreamRetentionPeriod); + DECLARE_PROCESSOR(UpdateShardCount); + DECLARE_PROCESSOR(RegisterStreamConsumer); + DECLARE_PROCESSOR(DeregisterStreamConsumer); + DECLARE_PROCESSOR(DescribeStreamConsumer); + DECLARE_PROCESSOR(ListStreamConsumers); + DECLARE_PROCESSOR(AddTagsToStream); + DECLARE_PROCESSOR(DisableEnhancedMonitoring); + DECLARE_PROCESSOR(EnableEnhancedMonitoring); + DECLARE_PROCESSOR(ListTagsForStream); + DECLARE_PROCESSOR(MergeShards); + DECLARE_PROCESSOR(RemoveTagsFromStream); + DECLARE_PROCESSOR(SplitShard); + DECLARE_PROCESSOR(StartStreamEncryption); + DECLARE_PROCESSOR(StopStreamEncryption); + #undef DECLARE_PROCESSOR } bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx) { diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h index 9936e48ea1..9b1f01af3e 100644 --- a/ydb/core/http_proxy/http_req.h +++ b/ydb/core/http_proxy/http_req.h @@ -77,6 +77,7 @@ struct THttpRequestContext { NActors::TActorId Sender; TString IamToken; + TString SerializedUserToken; const NKikimrConfig::TServerlessProxyConfig& ServiceConfig; NYdb::TDriver* Driver; std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider; diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp index ba9e849a18..b1b880d015 100644 --- a/ydb/core/http_proxy/http_service.cpp +++ b/ydb/core/http_proxy/http_service.cpp @@ -27,14 +27,16 @@ namespace NKikimr::NHttpProxy { ServiceAccountCredentialsProvider = cfg.CredentialsProvider; Processors = MakeHolder<THttpRequestProcessors>(); Processors->Initialize(); - auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1) - .SetGRpcKeepAlivePermitWithoutCalls(true) - .SetGRpcKeepAliveTimeout(TDuration::Seconds(90)) - .SetDiscoveryMode(NYdb::EDiscoveryMode::Async); - if (Config.GetCaCert()) { - config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll()); + if (cfg.UseSDK) { + auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1) + .SetGRpcKeepAlivePermitWithoutCalls(true) + .SetGRpcKeepAliveTimeout(TDuration::Seconds(90)) + .SetDiscoveryMode(NYdb::EDiscoveryMode::Async); + if (Config.GetCaCert()) { + config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll()); + } + Driver = MakeHolder<NYdb::TDriver>(std::move(config)); } - Driver = MakeHolder<NYdb::TDriver>(std::move(config)); } void Bootstrap(const TActorContext& ctx) { diff --git a/ydb/core/http_proxy/http_service.h b/ydb/core/http_proxy/http_service.h index 52f8a36953..3b22d79cfd 100644 --- a/ydb/core/http_proxy/http_service.h +++ b/ydb/core/http_proxy/http_service.h @@ -12,6 +12,7 @@ namespace NKikimr::NHttpProxy { struct THttpProxyConfig { NKikimrConfig::TServerlessProxyConfig Config; std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider; + bool UseSDK = false; }; NActors::IActor* CreateHttpProxy(const THttpProxyConfig& config); diff --git a/ydb/core/http_proxy/ya.make b/ydb/core/http_proxy/ya.make index c7340e13bb..31054b950a 100644 --- a/ydb/core/http_proxy/ya.make +++ b/ydb/core/http_proxy/ya.make @@ -4,8 +4,6 @@ LIBRARY() SRCS( - driver_cache_actor.cpp - driver_cache_actor.h events.h http_req.cpp http_req.h @@ -26,12 +24,15 @@ PEERDIR( library/cpp/actors/core ydb/core/base ydb/core/protos + ydb/core/grpc_services/local_rpc + ydb/library/yql/public/issue ydb/library/http_proxy/authorization ydb/library/http_proxy/error ydb/library/naming_conventions ydb/public/sdk/cpp/client/ydb_datastreams ydb/public/sdk/cpp/client/ydb_persqueue_core ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs + ydb/services/datastreams ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/testlib/service_mocks/access_service_mock.h b/ydb/core/testlib/service_mocks/access_service_mock.h index f18ac92f0a..ab19356a16 100644 --- a/ydb/core/testlib/service_mocks/access_service_mock.h +++ b/ydb/core/testlib/service_mocks/access_service_mock.h @@ -39,6 +39,7 @@ public: } else { key = request->iam_token(); } + auto it = AuthenticateData.find(key); if (it != AuthenticateData.end()) { response->CopyFrom(it->second.Response); @@ -52,8 +53,9 @@ public: virtual grpc::Status Authorize( grpc::ServerContext* ctx, const yandex::cloud::priv::servicecontrol::v1::AuthorizeRequest* request, - yandex::cloud::priv::servicecontrol::v1::AuthorizeResponse* response) override { - const TString& token = request->subject().user_account().id() + "-" + request->permission() + "-" + request->resource_path(0).id(); + yandex::cloud::priv::servicecontrol::v1::AuthorizeResponse* response) override + { + const TString& token = request->signature().access_key_id() + request->iam_token() + "-" + request->permission() + "-" + request->resource_path(0).id(); auto it = AuthorizeData.find(token); if (it != AuthorizeData.end()) { response->CopyFrom(it->second.Response); diff --git a/ydb/core/yq/libs/control_plane_proxy/utils.h b/ydb/core/yq/libs/control_plane_proxy/utils.h index 2a443edd2d..96e901b5f8 100644 --- a/ydb/core/yq/libs/control_plane_proxy/utils.h +++ b/ydb/core/yq/libs/control_plane_proxy/utils.h @@ -39,9 +39,17 @@ inline TString ExtractServiceAccountId(const YandexQuery::TestConnectionRequest& return ExtractServiceAccountIdImpl(c.setting()); } -template<typename T> -TString ExtractServiceAccountId(const T& c) { +inline TString ExtractServiceAccountId(const YandexQuery::CreateConnectionRequest& c) { + return ExtractServiceAccountIdImpl(c.content().setting()); +} + +inline TString ExtractServiceAccountId(const YandexQuery::ModifyConnectionRequest& c) { return ExtractServiceAccountIdImpl(c.content().setting()); } +template<typename T> +TString ExtractServiceAccountId(const T&) { + return {}; +} + } // namespace NYq diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index 7ec3cee164..80477524be 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -3,6 +3,7 @@ #include "shard_iterator.h" #include "next_token.h" +#include <ydb/core/grpc_services/service_datastreams.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/grpc_services/rpc_deferrable.h> #include <ydb/core/grpc_services/rpc_scheme_base.h> @@ -22,14 +23,22 @@ using namespace NKikimrClient; using grpc::Status; + namespace NKikimr::NDataStreams::V1 { const TString YDS_SERVICE_TYPE = "data-streams"; using namespace NGRpcService; using namespace NGRpcProxy::V1; + namespace { + template <class TRequest> + const TRequest* GetRequest(NGRpcService::IRequestOpCtx *request) + { + return dynamic_cast<const TRequest*>(request->GetRequest()); + } + ui32 PartitionWriteSpeedInBytesPerSec(ui32 speedInKbPerSec) { return speedInKbPerSec == 0 ? 1024 * 1024 : speedInKbPerSec * 1024; } @@ -102,10 +111,10 @@ namespace NKikimr::NDataStreams::V1 { class TCreateStreamActor : public TPQGrpcSchemaBase<TCreateStreamActor, NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest> { using TBase = TPQGrpcSchemaBase<TCreateStreamActor, TEvDataStreamsCreateStreamRequest>; + using TProtoRequest = typename TBase::TProtoRequest; - TActorId NewSchemeCache; public: - TCreateStreamActor(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest* request, TActorId newSchemeCache); + TCreateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TCreateStreamActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -117,11 +126,9 @@ namespace NKikimr::NDataStreams::V1 { }; - TCreateStreamActor::TCreateStreamActor(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest* request, TActorId newSchemeCache) - : TBase(request, request->GetProtoRequest()->stream_name()) - , NewSchemeCache(newSchemeCache) + TCreateStreamActor::TCreateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { - Y_UNUSED(NewSchemeCache); } void TCreateStreamActor::Bootstrap(const NActors::TActorContext& ctx) { @@ -194,9 +201,10 @@ namespace NKikimr::NDataStreams::V1 { class TDeleteStreamActor : public TPQGrpcSchemaBase<TDeleteStreamActor, NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest> { using TBase = TPQGrpcSchemaBase<TDeleteStreamActor, TEvDataStreamsDeleteStreamRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TDeleteStreamActor(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest* request); + TDeleteStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TDeleteStreamActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -211,9 +219,9 @@ namespace NKikimr::NDataStreams::V1 { bool EnforceDeletion; }; - TDeleteStreamActor::TDeleteStreamActor(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest* request) - : TBase(request, request->GetProtoRequest()->stream_name()) - , EnforceDeletion{request->GetProtoRequest()->enforce_consumer_deletion()} + TDeleteStreamActor::TDeleteStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) + , EnforceDeletion{GetProtoRequest()->enforce_consumer_deletion()} { } @@ -256,10 +264,10 @@ namespace NKikimr::NDataStreams::V1 { class TUpdateShardCountActor : public TUpdateSchemeActor<TUpdateShardCountActor, TEvDataStreamsUpdateShardCountRequest> { using TBase = TUpdateSchemeActor<TUpdateShardCountActor, TEvDataStreamsUpdateShardCountRequest>; - + using TProtoRequest = typename TBase::TProtoRequest; public: - TUpdateShardCountActor(TEvDataStreamsUpdateShardCountRequest* request) - : TBase(request, request->GetProtoRequest()->stream_name()) + TUpdateShardCountActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { } @@ -296,10 +304,11 @@ namespace NKikimr::NDataStreams::V1 { class TUpdateStreamActor : public TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest> { using TBase = TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TUpdateStreamActor(TEvDataStreamsUpdateStreamRequest* request) - : TBase(request, request->GetProtoRequest()->stream_name()) + TUpdateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { } @@ -342,10 +351,11 @@ namespace NKikimr::NDataStreams::V1 { class TSetWriteQuotaActor : public TUpdateSchemeActor<TSetWriteQuotaActor, TEvDataStreamsSetWriteQuotaRequest> { using TBase = TUpdateSchemeActor<TSetWriteQuotaActor, TEvDataStreamsSetWriteQuotaRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TSetWriteQuotaActor(TEvDataStreamsSetWriteQuotaRequest* request) - : TBase(request, request->GetProtoRequest()->stream_name()) + TSetWriteQuotaActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { } @@ -384,10 +394,11 @@ namespace NKikimr::NDataStreams::V1 { template<class TEvProto> class TSetStreamRetentionPeriodActor : public TUpdateSchemeActor<TSetStreamRetentionPeriodActor<TEvProto>, TEvProto> { using TBase = TUpdateSchemeActor<TSetStreamRetentionPeriodActor<TEvProto>, TEvProto>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TSetStreamRetentionPeriodActor(TEvProto* request, bool shouldIncrease) - : TBase(request, request->GetProtoRequest()->stream_name()) + TSetStreamRetentionPeriodActor(NKikimr::NGRpcService::IRequestOpCtx* request, bool shouldIncrease) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) , ShouldIncrease(shouldIncrease) { } @@ -424,9 +435,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TDescribeStreamActor, TEvDataStreamsDescribeStreamRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TDescribeStreamActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest* request); + TDescribeStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TDescribeStreamActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -473,8 +485,8 @@ namespace NKikimr::NDataStreams::V1 { std::map<ui64, std::pair<ui64, ui64>> StartEndOffsetsPerPartition; }; - TDescribeStreamActor::TDescribeStreamActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest* request) - : TBase(request, request->GetProtoRequest()->stream_name()) + TDescribeStreamActor::TDescribeStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { } @@ -594,7 +606,7 @@ namespace NKikimr::NDataStreams::V1 { using TBase = TRpcSchemeRequestActor<TListStreamsActor, TEvDataStreamsListStreamsRequest>; public: - TListStreamsActor(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest* request, TActorId newSchemeCache); + TListStreamsActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TListStreamsActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -620,12 +632,10 @@ namespace NKikimr::NDataStreams::V1 { ui32 RequestsInFlight = 0; std::vector<std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet>> WaitingList; std::vector<TString> Topics; - TActorId NewSchemeCache; }; - TListStreamsActor::TListStreamsActor(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest* request, TActorId newSchemeCache) + TListStreamsActor::TListStreamsActor(NKikimr::NGRpcService::IRequestOpCtx* request) : TBase(request) - , NewSchemeCache(newSchemeCache) { } @@ -649,7 +659,7 @@ namespace NKikimr::NDataStreams::V1 { void TListStreamsActor::SendPendingRequests(const TActorContext& ctx) { if (RequestsInFlight < MAX_IN_FLIGHT && WaitingList.size() > 0) { - ctx.Send(NewSchemeCache, WaitingList.back().release()); + ctx.Send(MakeSchemeCacheID(), WaitingList.back().release()); WaitingList.pop_back(); RequestsInFlight++; } @@ -749,9 +759,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TListStreamConsumersActor, TEvDataStreamsListStreamConsumersRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TListStreamConsumersActor(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest* request); + TListStreamConsumersActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TListStreamConsumersActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -770,15 +781,15 @@ namespace NKikimr::NDataStreams::V1 { ui32 MaxResults = DEFAULT_MAX_RESULTS; TNextToken NextToken; }; - TListStreamConsumersActor::TListStreamConsumersActor(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest* request) - : TBase(request, TNextToken(request->GetProtoRequest()->next_token()).IsValid() ? - TNextToken(request->GetProtoRequest()->next_token()).GetStreamArn() : - request->GetProtoRequest()->stream_arn()) - , NextToken(request->GetProtoRequest()->next_token()) + TListStreamConsumersActor::TListStreamConsumersActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, TNextToken(GetRequest<TProtoRequest>(request)->next_token()).IsValid() ? + TNextToken(GetRequest<TProtoRequest>(request)->next_token()).GetStreamArn() : + GetRequest<TProtoRequest>(request)->stream_arn()) + , NextToken(GetRequest<TProtoRequest>(request)->next_token()) { - if (request->GetProtoRequest()->next_token().empty()) { - StreamArn = request->GetProtoRequest()->stream_arn(); - MaxResults = request->GetProtoRequest()->max_results(); + if (GetProtoRequest()->next_token().empty()) { + StreamArn = GetProtoRequest()->stream_arn(); + MaxResults = GetProtoRequest()->max_results(); NextToken = TNextToken(StreamArn, 0, MaxResults, TInstant::Now().MilliSeconds()); } else { StreamArn = NextToken.GetStreamArn(); @@ -871,9 +882,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request); + TRegisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TRegisterStreamConsumerActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -886,9 +898,9 @@ namespace NKikimr::NDataStreams::V1 { private: TString ConsumerName; }; - TRegisterStreamConsumerActor::TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request) - : TBase(request, request->GetProtoRequest()->stream_arn()) - , ConsumerName(request->GetProtoRequest()->consumer_name()) + TRegisterStreamConsumerActor::TRegisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn()) + , ConsumerName(GetRequest<TProtoRequest>(request)->consumer_name()) { } @@ -948,9 +960,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request); + TDeregisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TDeregisterStreamConsumerActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -962,9 +975,9 @@ namespace NKikimr::NDataStreams::V1 { private: TString ConsumerName; }; - TDeregisterStreamConsumerActor::TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request) - : TBase(request, request->GetProtoRequest()->stream_arn()) - , ConsumerName(request->GetProtoRequest()->consumer_name()) + TDeregisterStreamConsumerActor::TDeregisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn()) + , ConsumerName(GetRequest<TProtoRequest>(request)->consumer_name()) { } @@ -998,9 +1011,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TGetShardIteratorActor, TEvDataStreamsGetShardIteratorRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TGetShardIteratorActor(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest* request, NActors::TActorId newSchemeCache); + TGetShardIteratorActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TGetShardIteratorActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -1014,7 +1028,6 @@ namespace NKikimr::NDataStreams::V1 { void SendResponse(const TActorContext& ctx, const TShardIterator& shardIt); std::optional<ui32> SequenceNumberToInt(const TString& sequenceNumberStr); - TActorId NewSchemeCache; TString StreamName; TString ShardId; TIteratorType IteratorType; @@ -1022,12 +1035,11 @@ namespace NKikimr::NDataStreams::V1 { ui64 ReadTimestampMs; }; - TGetShardIteratorActor::TGetShardIteratorActor(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest* request, NActors::TActorId newSchemeCache) - : TBase(request, request->GetProtoRequest()->stream_name()) - , NewSchemeCache(std::move(newSchemeCache)) - , StreamName{request->GetProtoRequest()->stream_name()} - , ShardId{request->GetProtoRequest()->shard_id()} - , IteratorType{request->GetProtoRequest()->shard_iterator_type()} + TGetShardIteratorActor::TGetShardIteratorActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) + , StreamName{GetRequest<TProtoRequest>(request)->stream_name()} + , ShardId{GetRequest<TProtoRequest>(request)->shard_id()} + , IteratorType{GetRequest<TProtoRequest>(request)->shard_iterator_type()} , SequenceNumber{0} , ReadTimestampMs{0} { @@ -1140,12 +1152,13 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>; + using TProtoRequest = typename TBase::TProtoRequest; static constexpr ui32 READ_TIMEOUT_MS = 150; static constexpr i32 MAX_LIMIT = 10000; public: - TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request, const TActorId& newSchemeCache); + TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TGetRecordsActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -1166,20 +1179,17 @@ namespace NKikimr::NDataStreams::V1 { TString StreamName; ui64 TabletId; i32 Limit; - TActorId NewSchemeCache; TActorId PipeClient; }; - TGetRecordsActor::TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request, - const TActorId& newSchemeCache) - : TBase(request, TShardIterator(request->GetProtoRequest()->shard_iterator()).IsValid() - ? TShardIterator(request->GetProtoRequest()->shard_iterator()).GetStreamName() + TGetRecordsActor::TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid() + ? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName() : "undefined") - , ShardIterator{request->GetProtoRequest()->shard_iterator()} + , ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()} , StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"} , TabletId{0} - , Limit{request->GetProtoRequest()->limit()} - , NewSchemeCache{newSchemeCache} + , Limit{GetRequest<TProtoRequest>(request)->limit()} { } @@ -1363,9 +1373,10 @@ namespace NKikimr::NDataStreams::V1 { , public TCdcStreamCompatible { using TBase = TPQGrpcSchemaBase<TListShardsActor, TEvDataStreamsListShardsRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TListShardsActor(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest* request, NActors::TActorId newSchemeCache); + TListShardsActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TListShardsActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -1386,7 +1397,6 @@ namespace NKikimr::NDataStreams::V1 { static constexpr ui32 MIN_MAX_RESULTS = 1; static constexpr ui32 DEFAULT_MAX_RESULTS = 100; - TActorId NewSchemeCache; TString StreamName; TShardFilter ShardFilter; TNextToken NextToken; @@ -1399,17 +1409,16 @@ namespace NKikimr::NDataStreams::V1 { std::vector<TActorId> Pipes; }; - TListShardsActor::TListShardsActor(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest* request, NActors::TActorId newSchemeCache) - : TBase(request, request->GetProtoRequest()->stream_name()) - , NewSchemeCache(std::move(newSchemeCache)) - , StreamName{request->GetProtoRequest()->stream_name()} - , ShardFilter{request->GetProtoRequest()->shard_filter()} - , NextToken{request->GetProtoRequest()->next_token()} + TListShardsActor::TListShardsActor(NKikimr::NGRpcService::IRequestOpCtx* request) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) + , StreamName{GetProtoRequest()->stream_name()} + , ShardFilter{GetProtoRequest()->shard_filter()} + , NextToken{GetProtoRequest()->next_token()} , GotOffsetResponds{0} { - if (request->GetProtoRequest()->next_token().empty()) { - StreamName = request->GetProtoRequest()->stream_name(); - MaxResults = request->GetProtoRequest()->max_results(); + if (GetProtoRequest()->next_token().empty()) { + StreamName = GetProtoRequest()->stream_name(); + MaxResults = GetProtoRequest()->max_results(); NextToken = TNextToken(StreamName, 0, MaxResults, TInstant::Now().MilliSeconds()); } else { StreamName = NextToken.GetStreamArn(); @@ -1633,9 +1642,10 @@ namespace NKikimr::NDataStreams::V1 { class TDescribeStreamSummaryActor : public TPQGrpcSchemaBase<TDescribeStreamSummaryActor, TEvDataStreamsDescribeStreamSummaryRequest> { using TBase = TPQGrpcSchemaBase<TDescribeStreamSummaryActor, TEvDataStreamsDescribeStreamSummaryRequest>; + using TProtoRequest = typename TBase::TProtoRequest; public: - TDescribeStreamSummaryActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest* request); + TDescribeStreamSummaryActor(NKikimr::NGRpcService::IRequestOpCtx* request); ~TDescribeStreamSummaryActor() = default; void Bootstrap(const NActors::TActorContext& ctx); @@ -1652,9 +1662,9 @@ namespace NKikimr::NDataStreams::V1 { }; TDescribeStreamSummaryActor::TDescribeStreamSummaryActor( - NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest* request + NKikimr::NGRpcService::IRequestOpCtx* request ) - : TBase(request, request->GetProtoRequest()->stream_name()) + : TBase(request, GetRequest<TProtoRequest>(request)->stream_name()) { } @@ -1723,7 +1733,7 @@ namespace NKikimr::NDataStreams::V1 { using TBase = TRpcSchemeRequestActor<TNotImplementedRequestActor, TEvRequest>; public: - TNotImplementedRequestActor(TEvRequest* request) + TNotImplementedRequestActor(NKikimr::NGRpcService::IRequestOpCtx* request) : TBase(request) { } @@ -1736,258 +1746,77 @@ namespace NKikimr::NDataStreams::V1 { this->Die(ctx); } }; - - //----------------------------------------------------------------------------------- - - IActor* CreateDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache) { - return new TDataStreamsService(counters, newSchemeCache); - } - - TDataStreamsService::TDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache) - : Counters(counters) - , NewSchemeCache(newSchemeCache) - { - } - - void TDataStreamsService::Bootstrap(const TActorContext&) { - Become(&TThis::StateFunc); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TCreateStreamActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDeleteStreamActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeStreamActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TRegisterStreamConsumerActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDeregisterStreamConsumerActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TPutRecordActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListStreamsActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListShardsActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TPutRecordsActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TGetRecordsActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TGetShardIteratorActor(ev->Release().Release(), NewSchemeCache)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TDescribeStreamSummaryActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(ev->Release().Release(), false)); - } - - void TDataStreamsService::Handle(TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(ev->Release().Release(), true)); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TUpdateShardCountActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TUpdateStreamActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TListStreamConsumersActor(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest>(ev->Release().Release())); - } - - void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest>(ev->Release().Release())); - } - -} - - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} +namespace NKikimr::NGRpcService { -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} +using namespace NDataStreams::V1; -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +#define DECLARE_RPC(name) template<> IActor* TEvDataStreams##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \ + return new T##name##Actor(msg);\ +}\ +void DoDataStreams##name##Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {\ + TActivationContext::AsActorContext().Register(new T##name##Actor(p.release())); \ } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +#define DECLARE_RPC_NI(name) template<> IActor* TEvDataStreams##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \ + return new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreams##name##Request>(msg);\ +}\ +void DoDataStreams##name##Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {\ + TActivationContext::AsActorContext().Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreams##name##Request>(p.release()));\ } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +DECLARE_RPC(CreateStream); +DECLARE_RPC(DeleteStream); +DECLARE_RPC(DescribeStream); +DECLARE_RPC(PutRecord); +DECLARE_RPC(RegisterStreamConsumer); +DECLARE_RPC(DeregisterStreamConsumer); +DECLARE_RPC_NI(DescribeStreamConsumer); +DECLARE_RPC(ListStreams); +DECLARE_RPC(ListShards); +DECLARE_RPC(PutRecords); +DECLARE_RPC(GetRecords); +DECLARE_RPC(GetShardIterator); +DECLARE_RPC_NI(SubscribeToShard); +DECLARE_RPC_NI(DescribeLimits); +DECLARE_RPC(DescribeStreamSummary); +DECLARE_RPC(UpdateShardCount); +DECLARE_RPC(ListStreamConsumers); +DECLARE_RPC_NI(AddTagsToStream); +DECLARE_RPC_NI(DisableEnhancedMonitoring); +DECLARE_RPC_NI(EnableEnhancedMonitoring); +DECLARE_RPC_NI(ListTagsForStream); +DECLARE_RPC(UpdateStream); +DECLARE_RPC(SetWriteQuota); +DECLARE_RPC_NI(MergeShards); +DECLARE_RPC_NI(RemoveTagsFromStream); +DECLARE_RPC_NI(SplitShard); +DECLARE_RPC_NI(StartStreamEncryption); +DECLARE_RPC_NI(StopStreamEncryption); + + + +void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + auto* req = dynamic_cast<TEvDataStreamsDecreaseStreamRetentionPeriodRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + TActivationContext::AsActorContext().Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(req, false)); } - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +template<> +IActor* TEvDataStreamsDecreaseStreamRetentionPeriodRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(dynamic_cast<TEvDataStreamsDecreaseStreamRetentionPeriodRequest*>(msg), false); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + auto* req = dynamic_cast<TEvDataStreamsIncreaseStreamRetentionPeriodRequest*>(p.release()); + Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper"); + TActivationContext::AsActorContext().Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(req, true)); } - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); +template<> +IActor* TEvDataStreamsIncreaseStreamRetentionPeriodRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + return new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(dynamic_cast<TEvDataStreamsIncreaseStreamRetentionPeriodRequest*>(msg), true); } -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSetWriteQuotaRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} - -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); -} -void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) { - ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release()); } diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h index 1712e171ee..72b9e1ae4b 100644 --- a/ydb/services/datastreams/datastreams_proxy.h +++ b/ydb/services/datastreams/datastreams_proxy.h @@ -2,97 +2,48 @@ #include "events.h" +#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.pb.h> + #include <ydb/core/client/server/grpc_base.h> #include <ydb/core/grpc_services/rpc_calls.h> #include <library/cpp/grpc/server/grpc_server.h> - #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/actorsystem.h> -namespace NKikimr::NDataStreams::V1 { - - inline TActorId GetDataStreamsServiceActorID() { - return TActorId(0, "PqDsProxy"); - } - - IActor* CreateDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache); - - class TDataStreamsService : public NActors::TActorBootstrapped<TDataStreamsService> { - public: - TDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache); - - void Bootstrap(const TActorContext& ctx); - - private: - STFUNC(StateFunc) { - switch (ev->GetTypeRewrite()) { - HFunc(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest, Handle); - HFunc(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest, Handle); - } - } - - void Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx); - void Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx); - - private: - TIntrusivePtr<NMonitoring::TDynamicCounters> Counters; - - TActorId NewSchemeCache; - }; +namespace NKikimr { +namespace NGRpcService { + +using TEvDataStreamsCreateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::CreateStreamRequest, Ydb::DataStreams::V1::CreateStreamResponse>; +using TEvDataStreamsDeleteStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DeleteStreamRequest, Ydb::DataStreams::V1::DeleteStreamResponse>; +using TEvDataStreamsDescribeStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamRequest, Ydb::DataStreams::V1::DescribeStreamResponse>; +using TEvDataStreamsRegisterStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::RegisterStreamConsumerRequest, Ydb::DataStreams::V1::RegisterStreamConsumerResponse>; +using TEvDataStreamsDeregisterStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DeregisterStreamConsumerRequest, Ydb::DataStreams::V1::DeregisterStreamConsumerResponse>; +using TEvDataStreamsDescribeStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamConsumerRequest, Ydb::DataStreams::V1::DescribeStreamConsumerResponse>; +using TEvDataStreamsPutRecordRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::PutRecordRequest, Ydb::DataStreams::V1::PutRecordResponse>; +using TEvDataStreamsListStreamsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamsRequest, Ydb::DataStreams::V1::ListStreamsResponse>; +using TEvDataStreamsListShardsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListShardsRequest, Ydb::DataStreams::V1::ListShardsResponse>; +using TEvDataStreamsPutRecordsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::PutRecordsRequest, Ydb::DataStreams::V1::PutRecordsResponse>; +using TEvDataStreamsGetRecordsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::GetRecordsRequest, Ydb::DataStreams::V1::GetRecordsResponse>; +using TEvDataStreamsGetShardIteratorRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::GetShardIteratorRequest, Ydb::DataStreams::V1::GetShardIteratorResponse>; +using TEvDataStreamsSubscribeToShardRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SubscribeToShardRequest, Ydb::DataStreams::V1::SubscribeToShardResponse>; +using TEvDataStreamsDescribeLimitsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeLimitsRequest, Ydb::DataStreams::V1::DescribeLimitsResponse>; +using TEvDataStreamsDescribeStreamSummaryRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamSummaryRequest, Ydb::DataStreams::V1::DescribeStreamSummaryResponse>; +using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse>; +using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse>; +using TEvDataStreamsUpdateShardCountRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse>; +using TEvDataStreamsUpdateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse>; +using TEvDataStreamsSetWriteQuotaRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse>; +using TEvDataStreamsListStreamConsumersRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse>; +using TEvDataStreamsAddTagsToStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::AddTagsToStreamRequest, Ydb::DataStreams::V1::AddTagsToStreamResponse>; +using TEvDataStreamsDisableEnhancedMonitoringRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DisableEnhancedMonitoringRequest, Ydb::DataStreams::V1::DisableEnhancedMonitoringResponse>; +using TEvDataStreamsEnableEnhancedMonitoringRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::EnableEnhancedMonitoringRequest, Ydb::DataStreams::V1::EnableEnhancedMonitoringResponse>; +using TEvDataStreamsListTagsForStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListTagsForStreamRequest, Ydb::DataStreams::V1::ListTagsForStreamResponse>; +using TEvDataStreamsMergeShardsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::MergeShardsRequest, Ydb::DataStreams::V1::MergeShardsResponse>; +using TEvDataStreamsRemoveTagsFromStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::RemoveTagsFromStreamRequest, Ydb::DataStreams::V1::RemoveTagsFromStreamResponse>; +using TEvDataStreamsSplitShardRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SplitShardRequest, Ydb::DataStreams::V1::SplitShardResponse>; +using TEvDataStreamsStartStreamEncryptionRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::StartStreamEncryptionRequest, Ydb::DataStreams::V1::StartStreamEncryptionResponse>; +using TEvDataStreamsStopStreamEncryptionRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::StopStreamEncryptionRequest, Ydb::DataStreams::V1::StopStreamEncryptionResponse>; } +} diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp index 7a18cd4dc6..564796ff94 100644 --- a/ydb/services/datastreams/grpc_service.cpp +++ b/ydb/services/datastreams/grpc_service.cpp @@ -1,14 +1,40 @@ #include "grpc_service.h" -#include "datastreams_proxy.h" - +#include <ydb/core/grpc_services/service_datastreams.h> +#include <ydb/core/base/counters.h> #include <ydb/core/base/appdata.h> - +#include <ydb/core/base/ticket_parser.h> #include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/rpc_calls.h> - +#include <ydb/core/grpc_services/base/base.h> #include <ydb/core/tx/scheme_board/cache.h> +namespace { + +using namespace NKikimr; + +void YdsProcessAttr(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, NGRpcService::ICheckerIface* checker) { + static const std::vector<TString> allowedAttributes = {"folder_id", "service_account_id", "database_id"}; + //full list of permissions for compatibility. remove old permissions later. + static const TVector<TString> permissions = { + "ydb.streams.write", + "ydb.databases.list", + "ydb.databases.create", + "ydb.databases.connect" + }; + TVector<std::pair<TString, TString>> attributes; + attributes.reserve(schemeData.GetPathDescription().UserAttributesSize()); + for (const auto& attr : schemeData.GetPathDescription().GetUserAttributes()) { + if (std::find(allowedAttributes.begin(), allowedAttributes.end(), attr.GetKey()) != allowedAttributes.end()) { + attributes.emplace_back(attr.GetKey(), attr.GetValue()); + } + } + if (!attributes.empty()) { + checker->SetEntries({{permissions, attributes}}); + } +} + +} + namespace NKikimr::NGRpcService { TGRpcDataStreamsService::TGRpcDataStreamsService(NActors::TActorSystem *system, @@ -24,11 +50,6 @@ void TGRpcDataStreamsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc { CQ_ = cq; - InitNewSchemeCache(); - IActor *proxyService = NDataStreams::V1::CreateDataStreamsService(Counters_, NewSchemeCache); - auto actorId = ActorSystem_->Register(proxyService, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); - ActorSystem_->RegisterLocalService(NDataStreams::V1::GetDataStreamsServiceActorID(), actorId); - SetupIncomingRequests(logger); } @@ -45,118 +66,57 @@ void TGRpcDataStreamsService::DecRequest() { Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); } -void TGRpcDataStreamsService::InitNewSchemeCache() { - auto appData = ActorSystem_->AppData<TAppData>(); - auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache"); - auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters); - NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()), - TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId); -} void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using std::placeholders::_1; + using std::placeholders::_2; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::IN, Ydb::DataStreams::V1::OUT, TGRpcDataStreamsService>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("data_streams", #NAME))->Run(); - - ADD_REQUEST(DescribeStream, DescribeStreamRequest, DescribeStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamRequest(ctx)); - }) - ADD_REQUEST(CreateStream, CreateStreamRequest, CreateStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsCreateStreamRequest(ctx)); - }) - ADD_REQUEST(ListStreams, ListStreamsRequest, ListStreamsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListStreamsRequest(ctx)); - }) - ADD_REQUEST(DeleteStream, DeleteStreamRequest, DeleteStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDeleteStreamRequest(ctx)); - }) - ADD_REQUEST(ListShards, ListShardsRequest, ListShardsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListShardsRequest(ctx)); - }) - ADD_REQUEST(PutRecord, PutRecordRequest, PutRecordResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsPutRecordRequest(ctx)); - }) - ADD_REQUEST(PutRecords, PutRecordsRequest, PutRecordsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsPutRecordsRequest(ctx)); - }) - ADD_REQUEST(GetRecords, GetRecordsRequest, GetRecordsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsGetRecordsRequest(ctx)); - }) - ADD_REQUEST(GetShardIterator, GetShardIteratorRequest, GetShardIteratorResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsGetShardIteratorRequest(ctx)); - }) - ADD_REQUEST(SubscribeToShard, SubscribeToShardRequest, SubscribeToShardResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSubscribeToShardRequest(ctx)); - }) - ADD_REQUEST(DescribeLimits, DescribeLimitsRequest, DescribeLimitsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeLimitsRequest(ctx)); - }) - ADD_REQUEST(DescribeStreamSummary, DescribeStreamSummaryRequest, DescribeStreamSummaryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamSummaryRequest(ctx)); - }) - ADD_REQUEST(DecreaseStreamRetentionPeriod, DecreaseStreamRetentionPeriodRequest, DecreaseStreamRetentionPeriodResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDecreaseStreamRetentionPeriodRequest(ctx)); - }) - ADD_REQUEST(IncreaseStreamRetentionPeriod, IncreaseStreamRetentionPeriodRequest, IncreaseStreamRetentionPeriodResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsIncreaseStreamRetentionPeriodRequest(ctx)); - }) - ADD_REQUEST(UpdateShardCount, UpdateShardCountRequest, UpdateShardCountResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsUpdateShardCountRequest(ctx)); - }) - ADD_REQUEST(RegisterStreamConsumer, RegisterStreamConsumerRequest, RegisterStreamConsumerResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsRegisterStreamConsumerRequest(ctx)); - }) - ADD_REQUEST(DeregisterStreamConsumer, DeregisterStreamConsumerRequest, DeregisterStreamConsumerResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDeregisterStreamConsumerRequest(ctx)); - }) - ADD_REQUEST(DescribeStreamConsumer, DescribeStreamConsumerRequest, DescribeStreamConsumerResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamConsumerRequest(ctx)); - }) - ADD_REQUEST(ListStreamConsumers, ListStreamConsumersRequest, ListStreamConsumersResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListStreamConsumersRequest(ctx)); - }) - ADD_REQUEST(AddTagsToStream, AddTagsToStreamRequest, AddTagsToStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsAddTagsToStreamRequest(ctx)); - }) - ADD_REQUEST(DisableEnhancedMonitoring, DisableEnhancedMonitoringRequest, DisableEnhancedMonitoringResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDisableEnhancedMonitoringRequest(ctx)); - }) - ADD_REQUEST(EnableEnhancedMonitoring, EnableEnhancedMonitoringRequest, EnableEnhancedMonitoringResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsEnableEnhancedMonitoringRequest(ctx)); - }) - ADD_REQUEST(ListTagsForStream, ListTagsForStreamRequest, ListTagsForStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListTagsForStreamRequest(ctx)); - }) - ADD_REQUEST(MergeShards, MergeShardsRequest, MergeShardsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsMergeShardsRequest(ctx)); - }) - ADD_REQUEST(RemoveTagsFromStream, RemoveTagsFromStreamRequest, RemoveTagsFromStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsRemoveTagsFromStreamRequest(ctx)); - }) - ADD_REQUEST(SplitShard, SplitShardRequest, SplitShardResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSplitShardRequest(ctx)); - }) - ADD_REQUEST(StartStreamEncryption, StartStreamEncryptionRequest, StartStreamEncryptionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsStartStreamEncryptionRequest(ctx)); - }) - ADD_REQUEST(StopStreamEncryption, StopStreamEncryptionRequest, StopStreamEncryptionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsStopStreamEncryptionRequest(ctx)); - }) - ADD_REQUEST(UpdateStream, UpdateStreamRequest, UpdateStreamResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsUpdateStreamRequest(ctx)); - }) - ADD_REQUEST(SetWriteQuota, SetWriteQuotaRequest, SetWriteQuotaResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSetWriteQuotaRequest(ctx)); - }) +#define ADD_REQUEST(NAME, CB, ATTR) \ + MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response, TGRpcDataStreamsService>> \ + (this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \ + (ctx, CB, TRequestAuxSettings{TRateLimiterMode::Off, ATTR})); \ + }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("data_streams", #NAME))->Run(); + + ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr) + ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr) + ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr) + ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr) + ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr) + ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr) + ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr) + ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr) + ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr) + ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr) + ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr) + ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr) + ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr) + ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr) + ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr) + ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr) + ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr) + ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr) + ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr) + ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr) + ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr) + ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr) + ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr) + ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr) + ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr) + ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr) + ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr) + ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr) + ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr) + ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr) #undef ADD_REQUEST } diff --git a/ydb/services/datastreams/grpc_service.h b/ydb/services/datastreams/grpc_service.h index d8ed51df52..044aacb8a1 100644 --- a/ydb/services/datastreams/grpc_service.h +++ b/ydb/services/datastreams/grpc_service.h @@ -21,14 +21,12 @@ namespace NKikimr::NGRpcService { private: void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - void InitNewSchemeCache(); NActors::TActorSystem* ActorSystem_; grpc::ServerCompletionQueue* CQ_ = nullptr; TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; NActors::TActorId GRpcRequestProxyId_; - NActors::TActorId NewSchemeCache; NGrpc::TGlobalLimiter* Limiter_ = nullptr; }; diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h index ff2e2beb86..8923dce1c6 100644 --- a/ydb/services/datastreams/put_records_actor.h +++ b/ydb/services/datastreams/put_records_actor.h @@ -1,8 +1,8 @@ #pragma once +#include "datastreams_proxy.h" #include "events.h" -#include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/persqueue/events/global.h> #include <ydb/core/persqueue/write_meta.h> #include <ydb/core/protos/msgbus_pq.pb.h> @@ -17,8 +17,6 @@ namespace NKikimr::NDataStreams::V1 { - - struct TPutRecordsItem { TString Data; TString Key; @@ -218,7 +216,7 @@ namespace NKikimr::NDataStreams::V1 { using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>; public: - TPutRecordsActorBase(TProto* request, NActors::TActorId newSchemeCache); + TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request); ~TPutRecordsActorBase() = default; void Bootstrap(const NActors::TActorContext &ctx); @@ -235,7 +233,6 @@ namespace NKikimr::NDataStreams::V1 { }; THashMap<ui32, TPartitionTask> PartitionToActor; - NActors::TActorId NewSchemeCache; Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult; TString Ip; @@ -256,9 +253,8 @@ namespace NKikimr::NDataStreams::V1 { }; template<class TDerived, class TProto> - TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(TProto* request, NActors::TActorId newSchemeCache) - : TBase(request, request->GetProtoRequest()->stream_name()) - , NewSchemeCache(std::move(newSchemeCache)) + TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request) + : TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name()) , Ip(request->GetPeerName()) { Y_ENSURE(request); @@ -302,7 +298,7 @@ namespace NKikimr::NDataStreams::V1 { entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList; entry.SyncVersion = true; schemeCacheRequest->ResultSet.emplace_back(entry); - ctx.Send(NewSchemeCache, MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release())); + ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release())); } template<class TDerived, class TProto> @@ -402,8 +398,8 @@ namespace NKikimr::NDataStreams::V1 { public: using TBase = TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest>; - TPutRecordsActor(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest* request, TActorId newSchemeCache) - : TBase(request, newSchemeCache) + TPutRecordsActor(NGRpcService::IRequestOpCtx* request) + : TBase(request) {} const Ydb::DataStreams::V1::PutRecordsRequest& GetPutRecordsRequest() const; @@ -424,15 +420,15 @@ namespace NKikimr::NDataStreams::V1 { public: using TBase = TPutRecordsActorBase<TPutRecordActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest>; - TPutRecordActor(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest* request, TActorId newSchemeCache) - : TBase(request, newSchemeCache) + TPutRecordActor(NGRpcService::IRequestOpCtx* request) + : TBase(request) { - PutRecordsRequest.set_stream_name(request->GetProtoRequest()->stream_name()); + PutRecordsRequest.set_stream_name(GetProtoRequest()->stream_name()); auto& record = *PutRecordsRequest.add_records(); - record.set_data(request->GetProtoRequest()->data()); - record.set_explicit_hash_key(request->GetProtoRequest()->explicit_hash_key()); - record.set_partition_key(request->GetProtoRequest()->partition_key()); + record.set_data(GetProtoRequest()->data()); + record.set_explicit_hash_key(GetProtoRequest()->explicit_hash_key()); + record.set_partition_key(GetProtoRequest()->partition_key()); } const Ydb::DataStreams::V1::PutRecordsRequest& GetPutRecordsRequest() const; diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h index 74e961793e..b9294498b2 100644 --- a/ydb/services/lib/actors/pq_schema_actor.h +++ b/ydb/services/lib/actors/pq_schema_actor.h @@ -66,8 +66,11 @@ namespace NKikimr::NGRpcProxy::V1 { protected: using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>; + using TProtoRequest = typename TRequest::TRequest; + public: - TPQGrpcSchemaBase(TRequest *request, const TString& topicPath) + + TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath) : TBase(request) , TopicPath(topicPath) { @@ -286,7 +289,7 @@ namespace NKikimr::NGRpcProxy::V1 { using TBase = TPQGrpcSchemaBase<TDerived, TRequest>; public: - TUpdateSchemeActor(TRequest* request, const TString& topicPath) + TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath) : TBase(request, topicPath) {} ~TUpdateSchemeActor() = default; diff --git a/ydb/services/yq/grpc_service.cpp b/ydb/services/yq/grpc_service.cpp index c7c0b6b0ae..bb71dde092 100644 --- a/ydb/services/yq/grpc_service.cpp +++ b/ydb/services/yq/grpc_service.cpp @@ -3,6 +3,7 @@ #include <ydb/core/grpc_services/grpc_helper.h> #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/service_yq.h> #include <ydb/library/protobuf_printer/security_printer.h> namespace NKikimr::NGRpcService { @@ -34,102 +35,154 @@ void TGRpcYandexQueryService::DecRequest() { void TGRpcYandexQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + static const TVector<TString> CreateQueryPermissions = { + "yq.queries.create", + "yq.queries.invoke", + "yq.connections.use", + "yq.bindings.use", + "yq.resources.managePublic" + }; + static const TVector<TString> ListQueriesPermissions = { + "yq.queries.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeQueryPermissions = { + "yq.queries.get", + "yq.queries.viewAst", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> GetQueryStatusPermissions = { + "yq.queries.getStatus", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyQueryPermissions = { + "yq.queries.update", + "yq.queries.invoke", + "yq.connections.use", + "yq.bindings.use", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> DeleteQueryPermissions = { + "yq.queries.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> ControlQueryPermissions = { + "yq.queries.control", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> GetResultDataPermissions = { + "yq.queries.getData", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ListJobsPermissions = { + "yq.jobs.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeJobPermissions = { + "yq.jobs.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> CreateConnectionPermissions = { + "yq.connections.create", + "yq.resources.managePublic", + }; + static const TVector<TString> ListConnectionsPermissions = { + "yq.connections.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeConnectionPermissions = { + "yq.connections.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyConnectionPermissions = { + "yq.connections.update", + "yq.resources.managePublic", + "yq.resources.managePrivate", + }; + static const TVector<TString> DeleteConnectionPermissions = { + "yq.connections.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> TestConnectionPermissions = { + "yq.connections.create", + }; + static const TVector<TString> CreateBindingPermissions = { + "yq.bindings.create", + "yq.resources.managePublic" + }; + static const TVector<TString> ListBindingsPermissions = { + "yq.bindings.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> DescribeBindingPermissions = { + "yq.bindings.get", + "yq.resources.viewPublic", + "yq.resources.viewPrivate" + }; + static const TVector<TString> ModifyBindingPermissions = { + "yq.bindings.update", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + static const TVector<TString> DeleteBindingPermissions = { + "yq.bindings.delete", + "yq.resources.managePublic", + "yq.resources.managePrivate" + }; + #ifdef ADD_REQUEST #error ADD_REQUEST macro already defined #endif -#define ADD_REQUEST(NAME, IN, OUT, ACTION) \ - MakeIntrusive<TGRpcRequest<YandexQuery::IN, YandexQuery::OUT, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::IN>, TSecurityTextFormatPrinter<YandexQuery::OUT>>>(this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ACTION; \ - }, &YandexQuery::V1::YandexQueryService::AsyncService::Request ## NAME, \ - #NAME, logger, getCounterBlock("yq", #NAME))->Run(); - - ADD_REQUEST(CreateQuery, CreateQueryRequest, CreateQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateQueryRequest(ctx)); - }) - - ADD_REQUEST(ListQueries, ListQueriesRequest, ListQueriesResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListQueriesRequest(ctx)); - }) - - ADD_REQUEST(DescribeQuery, DescribeQueryRequest, DescribeQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeQueryRequest(ctx)); - }) - - ADD_REQUEST(GetQueryStatus, GetQueryStatusRequest, GetQueryStatusResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryGetQueryStatusRequest(ctx)); - }) - - ADD_REQUEST(ModifyQuery, ModifyQueryRequest, ModifyQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyQueryRequest(ctx)); - }) - - ADD_REQUEST(DeleteQuery, DeleteQueryRequest, DeleteQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteQueryRequest(ctx)); - }) - - ADD_REQUEST(ControlQuery, ControlQueryRequest, ControlQueryResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryControlQueryRequest(ctx)); - }) - - ADD_REQUEST(GetResultData, GetResultDataRequest, GetResultDataResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryGetResultDataRequest(ctx)); - }) - - ADD_REQUEST(ListJobs, ListJobsRequest, ListJobsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListJobsRequest(ctx)); - }) - - ADD_REQUEST(DescribeJob, DescribeJobRequest, DescribeJobResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeJobRequest(ctx)); - }) - - ADD_REQUEST(CreateConnection, CreateConnectionRequest, CreateConnectionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateConnectionRequest(ctx)); - }) - - ADD_REQUEST(ListConnections, ListConnectionsRequest, ListConnectionsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListConnectionsRequest(ctx)); - }) - - ADD_REQUEST(DescribeConnection, DescribeConnectionRequest, DescribeConnectionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeConnectionRequest(ctx)); - }) - - ADD_REQUEST(ModifyConnection, ModifyConnectionRequest, ModifyConnectionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyConnectionRequest(ctx)); - }) - - ADD_REQUEST(DeleteConnection, DeleteConnectionRequest, DeleteConnectionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteConnectionRequest(ctx)); - }) - - ADD_REQUEST(TestConnection, TestConnectionRequest, TestConnectionResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryTestConnectionRequest(ctx)); - }) - - ADD_REQUEST(CreateBinding, CreateBindingRequest, CreateBindingResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateBindingRequest(ctx)); - }) - - ADD_REQUEST(ListBindings, ListBindingsRequest, ListBindingsResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListBindingsRequest(ctx)); - }) - - ADD_REQUEST(DescribeBinding, DescribeBindingRequest, DescribeBindingResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeBindingRequest(ctx)); - }) - - ADD_REQUEST(ModifyBinding, ModifyBindingRequest, ModifyBindingResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyBindingRequest(ctx)); - }) - - ADD_REQUEST(DeleteBinding, DeleteBindingRequest, DeleteBindingResponse, { - ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteBindingRequest(ctx)); - }) +#define ADD_REQUEST(NAME, CB, PERMISSIONS) \ +MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Response, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::NAME##Request>, TSecurityTextFormatPrinter<YandexQuery::NAME##Response>>>( \ + this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcYqRequestOperationCall<YandexQuery::NAME##Request, YandexQuery::NAME##Response> \ + (ctx, &CB, PERMISSIONS)); \ + }, \ + &YandexQuery::V1::YandexQueryService::AsyncService::Request##NAME, \ + #NAME, logger, getCounterBlock("yq", #NAME)) \ + ->Run(); \ + + ADD_REQUEST(CreateQuery, DoYandexQueryCreateQueryRequest, CreateQueryPermissions) + ADD_REQUEST(ListQueries, DoYandexQueryListQueriesRequest, ListQueriesPermissions) + ADD_REQUEST(DescribeQuery, DoYandexQueryDescribeQueryRequest, DescribeQueryPermissions) + ADD_REQUEST(GetQueryStatus, DoYandexQueryGetQueryStatusRequest, GetQueryStatusPermissions) + ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions) + ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions) + ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions) + ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions) + ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions) + ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions) + ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions) + ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions) + ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions) + ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions) + ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions) + ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions) + ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions) + ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions) + ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions) + ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions) + ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions) #undef ADD_REQUEST + } } // namespace NKikimr::NGRpcService |