aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoralexnick <alexnick@yandex-team.ru>2022-06-06 14:41:13 +0300
committerDaniil Cherednik <dcherednik@yandex-team.ru>2022-06-06 14:41:13 +0300
commita2c8f380bc23c5c6b73d8049951b7e6f37cac904 (patch)
tree54a10681713a4159c437dd8123f2c577a1c01971
parent349643054488818105f9d79204fd40988347d340 (diff)
downloadydb-a2c8f380bc23c5c6b73d8049951b7e6f37cac904.tar.gz
KIKIMR-14665 inside ydb use only local rpc calls
KIKIMR-14665 inside ydb use only local rpc calls REVIEW: 2485498 Invert grpc_request_proxy dependence for datastream service. KIKIMR-13646 REVIEW: 2396725 REVIEW: 2488385 x-ydb-stable-ref: f5aed4ff2239e96f0fcd0bb4a4e060858d680ad8
-rw-r--r--ydb/core/base/ticket_parser.h31
-rw-r--r--ydb/core/grpc_services/base/base.h57
-rw-r--r--ydb/core/grpc_services/grpc_request_check_actor.h151
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp52
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.h51
-rw-r--r--ydb/core/grpc_services/rpc_analytics_internal.cpp5
-rw-r--r--ydb/core/grpc_services/rpc_calls.h52
-rw-r--r--ydb/core/grpc_services/rpc_yq.cpp896
-rw-r--r--ydb/core/grpc_services/service_datastreams.h46
-rw-r--r--ydb/core/grpc_services/service_yq.h92
-rw-r--r--ydb/core/http_proxy/auth_factory.cpp8
-rw-r--r--ydb/core/http_proxy/discovery_actor.cpp1
-rw-r--r--ydb/core/http_proxy/driver_cache_actor.cpp80
-rw-r--r--ydb/core/http_proxy/driver_cache_actor.h8
-rw-r--r--ydb/core/http_proxy/events.h5
-rw-r--r--ydb/core/http_proxy/http_req.cpp130
-rw-r--r--ydb/core/http_proxy/http_req.h1
-rw-r--r--ydb/core/http_proxy/http_service.cpp16
-rw-r--r--ydb/core/http_proxy/http_service.h1
-rw-r--r--ydb/core/http_proxy/ya.make5
-rw-r--r--ydb/core/testlib/service_mocks/access_service_mock.h6
-rw-r--r--ydb/core/yq/libs/control_plane_proxy/utils.h12
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp457
-rw-r--r--ydb/services/datastreams/datastreams_proxy.h121
-rw-r--r--ydb/services/datastreams/grpc_service.cpp190
-rw-r--r--ydb/services/datastreams/grpc_service.h2
-rw-r--r--ydb/services/datastreams/put_records_actor.h30
-rw-r--r--ydb/services/lib/actors/pq_schema_actor.h7
-rw-r--r--ydb/services/yq/grpc_service.cpp235
29 files changed, 928 insertions, 1820 deletions
diff --git a/ydb/core/base/ticket_parser.h b/ydb/core/base/ticket_parser.h
index 1514c74a05..5ebde54b6c 100644
--- a/ydb/core/base/ticket_parser.h
+++ b/ydb/core/base/ticket_parser.h
@@ -38,6 +38,18 @@ namespace NKikimr {
// only one of them will be processed. Which one is not guaranteed
const std::vector<TEntry> Entries;
+ struct TAccessKeySignature {
+ TString AccessKeyId;
+ TString StringToSign;
+ TString Signature;
+
+ TInstant SignedAt;
+ TString Service;
+ TString Region;
+ };
+
+ const TAccessKeySignature Signature;
+
struct TInitializationFields {
TString Database;
TString Ticket;
@@ -83,6 +95,14 @@ namespace NKikimr {
, PeerName(peerName)
, Entries(entries)
{}
+
+ TEvAuthorizeTicket(TAccessKeySignature&& sign, const TString& peerName, const TVector<TEntry>& entries)
+ : Ticket("")
+ , PeerName(peerName)
+ , Entries(entries)
+ , Signature(std::move(sign))
+ {}
+
};
struct TError {
@@ -162,3 +182,14 @@ template <>
inline void Out<NKikimr::TEvTicketParser::TError>(IOutputStream& str, const NKikimr::TEvTicketParser::TError& error) {
str << error.Message;
}
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class ICheckerIface {
+public:
+ virtual void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries) = 0;
+};
+
+}
+}
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 44b25c4a5f..5c5d59dc8b 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -1024,20 +1024,65 @@ private:
TMaybe<NRpcService::TRlPath> RlPath;
};
+template<ui32 TRpcId, typename TReq, typename TResp, bool IsOperation, typename TDerived>
+class TGRpcRequestValidationWrapperImpl :
+ public TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived> {
+
+public:
+ static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
+ TGRpcRequestValidationWrapperImpl(NGrpc::IRequestContextBase* ctx)
+ : TGRpcRequestWrapperImpl<TRpcId, TReq, TResp, IsOperation, TDerived>(ctx)
+ { }
+
+ bool Validate(TString& error) override {
+ return this->GetProtoRequest()->validate(error);
+ }
+};
+
+// SFINAE
+// Check protobuf has validate feature
+template<typename TProto>
+struct TProtoHasValidate {
+private:
+ static int Detect(...);
+ // validate function has prototype: bool validate(TProtoStringType&) const
+ static TProtoStringType Dummy_;
+ template<typename U>
+ static decltype(std::declval<U>().validate(Dummy_)) Detect(const U&);
+public:
+ static constexpr bool Value = std::is_same<bool, decltype(Detect(std::declval<TProto>()))>::value;
+};
+
class IFacilityProvider;
template <typename TReq, typename TResp, bool IsOperation>
class TGrpcRequestCall
- : public TGRpcRequestWrapperImpl<
- TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>> {
+ : public std::conditional_t<TProtoHasValidate<TReq>::Value,
+ TGRpcRequestValidationWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>,
+ TGRpcRequestWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>>
+ {
typedef typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type TRequestIface;
public:
static constexpr bool IsOp = IsOperation;
+ static IActor* CreateRpcActor(typename std::conditional<IsOperation, IRequestOpCtx, IRequestNoOpCtx>::type* msg);
+ using TBase = std::conditional_t<TProtoHasValidate<TReq>::Value,
+ TGRpcRequestValidationWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>,
+ TGRpcRequestWrapperImpl<
+ TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation, TGrpcRequestCall<TReq, TResp, IsOperation>>>;
+
TGrpcRequestCall(NGrpc::IRequestContextBase* ctx,
void (*cb)(std::unique_ptr<TRequestIface>, const IFacilityProvider&), TRequestAuxSettings auxSettings = {})
- : TGRpcRequestWrapperImpl<
- TRpcServices::EvGrpcRuntimeRequest, TReq, TResp, IsOperation,
- TGrpcRequestCall<TReq, TResp, IsOperation>>(ctx)
+ : TBase(ctx)
+ , PassMethod(cb)
+ , AuxSettings(std::move(auxSettings))
+ { }
+
+ TGrpcRequestCall(NGrpc::IRequestContextBase* ctx,
+ std::function<void(std::unique_ptr<TRequestIface>, const IFacilityProvider&)> cb, TRequestAuxSettings auxSettings = {})
+ : TBase(ctx)
, PassMethod(cb)
, AuxSettings(std::move(auxSettings))
{ }
@@ -1061,7 +1106,7 @@ public:
}
}
private:
- void (*PassMethod)(std::unique_ptr<TRequestIface>, const IFacilityProvider&);
+ std::function<void(std::unique_ptr<TRequestIface>, const IFacilityProvider&)> PassMethod;
const TRequestAuxSettings AuxSettings;
};
diff --git a/ydb/core/grpc_services/grpc_request_check_actor.h b/ydb/core/grpc_services/grpc_request_check_actor.h
index 6c4be43394..9ee1b7a3f3 100644
--- a/ydb/core/grpc_services/grpc_request_check_actor.h
+++ b/ydb/core/grpc_services/grpc_request_check_actor.h
@@ -18,11 +18,6 @@
namespace NKikimr {
namespace NGRpcService {
-class ICheckerIface {
-public:
- void SetEntries(const TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry>& entries);
-};
-
template <typename TEvent>
class TGrpcRequestCheckActor
: public TActorBootstrappedSecureRequest<TGrpcRequestCheckActor<TEvent>>
@@ -72,8 +67,6 @@ public:
TBase::SetEntries(entries);
}
- void InitializeAttributesFromYandexQuery(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
void InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
TGrpcRequestCheckActor(
@@ -424,71 +417,6 @@ void TGrpcRequestCheckActor<TEvent>::InitializeAttributes(const TSchemeBoardEven
InitializeAttributesFromSchema(schemeData);
}
-// yq behavior
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData);
-
-
// default permissions
template <typename TEvent>
const TVector<TString>& TGrpcRequestCheckActor<TEvent>::GetPermissions() {
@@ -500,85 +428,6 @@ const TVector<TString>& TGrpcRequestCheckActor<TEvent>::GetPermissions() {
return permissions;
}
-// yds behavior
-template <>
-inline const TVector<TString>& TGrpcRequestCheckActor<TEvDataStreamsPutRecordRequest>::GetPermissions() {
- //full list of permissions for compatility. remove old permissions later.
- static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list", "ydb.databases.create", "ydb.databases.connect"};
- return permissions;
-}
-// yds behavior
-template <>
-inline const TVector<TString>& TGrpcRequestCheckActor<TEvDataStreamsPutRecordsRequest>::GetPermissions() {
- //full list of permissions for compatility. remove old permissions later.
- static const TVector<TString> permissions = {"ydb.streams.write", "ydb.databases.list", "ydb.databases.create", "ydb.databases.connect"};
- return permissions;
-}
-
-// yq behavior
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::GetPermissions();
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::GetPermissions();
-
template <typename TEvent>
IActor* CreateGrpcRequestCheckActor(
const TActorId& owner,
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index fd8c78ab94..c44c7f8f6d 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -640,58 +640,6 @@ void TGRpcRequestProxyImpl::StateFunc(TAutoPtr<IEventHandle>& ev, const TActorCo
HFunc(TEvLongTxRollbackRequest, PreHandle);
HFunc(TEvLongTxWriteRequest, PreHandle);
HFunc(TEvLongTxReadRequest, PreHandle);
- HFunc(TEvDataStreamsCreateStreamRequest, PreHandle);
- HFunc(TEvDataStreamsDeleteStreamRequest, PreHandle);
- HFunc(TEvDataStreamsDescribeStreamRequest, PreHandle);
- HFunc(TEvDataStreamsRegisterStreamConsumerRequest, PreHandle);
- HFunc(TEvDataStreamsDeregisterStreamConsumerRequest, PreHandle);
- HFunc(TEvDataStreamsDescribeStreamConsumerRequest, PreHandle);
- HFunc(TEvDataStreamsPutRecordRequest, PreHandle);
- HFunc(TEvDataStreamsListStreamsRequest, PreHandle);
- HFunc(TEvDataStreamsListShardsRequest, PreHandle);
- HFunc(TEvDataStreamsPutRecordsRequest, PreHandle);
- HFunc(TEvDataStreamsGetRecordsRequest, PreHandle);
- HFunc(TEvDataStreamsGetShardIteratorRequest, PreHandle);
- HFunc(TEvDataStreamsSubscribeToShardRequest, PreHandle);
- HFunc(TEvDataStreamsDescribeLimitsRequest, PreHandle);
- HFunc(TEvDataStreamsDescribeStreamSummaryRequest, PreHandle);
- HFunc(TEvDataStreamsDecreaseStreamRetentionPeriodRequest, PreHandle);
- HFunc(TEvDataStreamsIncreaseStreamRetentionPeriodRequest, PreHandle);
- HFunc(TEvDataStreamsUpdateShardCountRequest, PreHandle);
- HFunc(TEvDataStreamsUpdateStreamRequest, PreHandle);
- HFunc(TEvDataStreamsSetWriteQuotaRequest, PreHandle);
- HFunc(TEvDataStreamsListStreamConsumersRequest, PreHandle);
- HFunc(TEvDataStreamsAddTagsToStreamRequest, PreHandle);
- HFunc(TEvDataStreamsDisableEnhancedMonitoringRequest, PreHandle);
- HFunc(TEvDataStreamsEnableEnhancedMonitoringRequest, PreHandle);
- HFunc(TEvDataStreamsListTagsForStreamRequest, PreHandle);
- HFunc(TEvDataStreamsMergeShardsRequest, PreHandle);
- HFunc(TEvDataStreamsRemoveTagsFromStreamRequest, PreHandle);
- HFunc(TEvDataStreamsSplitShardRequest, PreHandle);
- HFunc(TEvDataStreamsStartStreamEncryptionRequest, PreHandle);
- HFunc(TEvDataStreamsStopStreamEncryptionRequest, PreHandle);
- HFunc(TEvYandexQueryCreateQueryRequest, PreHandle);
- HFunc(TEvYandexQueryListQueriesRequest, PreHandle);
- HFunc(TEvYandexQueryDescribeQueryRequest, PreHandle);
- HFunc(TEvYandexQueryGetQueryStatusRequest, PreHandle);
- HFunc(TEvYandexQueryModifyQueryRequest, PreHandle);
- HFunc(TEvYandexQueryDeleteQueryRequest, PreHandle);
- HFunc(TEvYandexQueryControlQueryRequest, PreHandle);
- HFunc(TEvYandexQueryGetResultDataRequest, PreHandle);
- HFunc(TEvYandexQueryListJobsRequest, PreHandle);
- HFunc(TEvYandexQueryDescribeJobRequest, PreHandle);
- HFunc(TEvYandexQueryCreateConnectionRequest, PreHandle);
- HFunc(TEvYandexQueryListConnectionsRequest, PreHandle);
- HFunc(TEvYandexQueryDescribeConnectionRequest, PreHandle);
- HFunc(TEvYandexQueryModifyConnectionRequest, PreHandle);
- HFunc(TEvYandexQueryDeleteConnectionRequest, PreHandle);
- HFunc(TEvYandexQueryTestConnectionRequest, PreHandle);
- HFunc(TEvYandexQueryCreateBindingRequest, PreHandle);
- HFunc(TEvYandexQueryListBindingsRequest, PreHandle);
- HFunc(TEvYandexQueryDescribeBindingRequest, PreHandle);
- HFunc(TEvYandexQueryModifyBindingRequest, PreHandle);
- HFunc(TEvYandexQueryDeleteBindingRequest, PreHandle);
-
HFunc(TEvProxyRuntimeEvent, PreHandle);
default:
diff --git a/ydb/core/grpc_services/grpc_request_proxy.h b/ydb/core/grpc_services/grpc_request_proxy.h
index d1d1981cfe..78dedf49d2 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.h
+++ b/ydb/core/grpc_services/grpc_request_proxy.h
@@ -121,57 +121,6 @@ protected:
void Handle(TEvLongTxRollbackRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLongTxWriteRequest::TPtr& ev, const TActorContext& ctx);
void Handle(TEvLongTxReadRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsSetWriteQuotaRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryCreateQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryListQueriesRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDescribeQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryGetQueryStatusRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryModifyQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDeleteQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryControlQueryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryGetResultDataRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryListJobsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDescribeJobRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryCreateConnectionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryListConnectionsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDescribeConnectionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryModifyConnectionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDeleteConnectionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryTestConnectionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryCreateBindingRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryListBindingsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDescribeBindingRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryModifyBindingRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(TEvYandexQueryDeleteBindingRequest::TPtr& ev, const TActorContext& ctx);
TActorId DiscoveryCacheActorID;
};
diff --git a/ydb/core/grpc_services/rpc_analytics_internal.cpp b/ydb/core/grpc_services/rpc_analytics_internal.cpp
index ae3ac7bc59..48227a57fd 100644
--- a/ydb/core/grpc_services/rpc_analytics_internal.cpp
+++ b/ydb/core/grpc_services/rpc_analytics_internal.cpp
@@ -26,10 +26,7 @@ namespace {
template <typename TEv, typename TReq>
void SendResponse(const TEv& ev, TReq& req) {
if (!ev->Get()->Record) {
- for (const auto& x : ev->Get()->Issues) {
- req.RaiseIssue(x);
- }
-
+ req.RaiseIssues(ev->Get()->Issues);
req.ReplyWithYdbStatus(ev->Get()->Status);
} else {
req.SendResult(*ev->Get()->Record, ev->Get()->Status);
diff --git a/ydb/core/grpc_services/rpc_calls.h b/ydb/core/grpc_services/rpc_calls.h
index c1b396ae05..e866c9023a 100644
--- a/ydb/core/grpc_services/rpc_calls.h
+++ b/ydb/core/grpc_services/rpc_calls.h
@@ -24,7 +24,6 @@
#include <ydb/public/api/grpc/draft/dummy.pb.h>
#include <ydb/public/api/grpc/draft/ydb_long_tx_v1.pb.h>
-#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>
@@ -119,57 +118,6 @@ using TEvLongTxCommitRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxCommit,
using TEvLongTxRollbackRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRollback, Ydb::LongTx::RollbackTransactionRequest, Ydb::LongTx::RollbackTransactionResponse, true>;
using TEvLongTxWriteRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxWrite, Ydb::LongTx::WriteRequest, Ydb::LongTx::WriteResponse, true>;
using TEvLongTxReadRequest = TGRpcRequestWrapper<TRpcServices::EvLongTxRead, Ydb::LongTx::ReadRequest, Ydb::LongTx::ReadResponse, true>;
-using TEvDataStreamsCreateStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsCreateStream, Ydb::DataStreams::V1::CreateStreamRequest, Ydb::DataStreams::V1::CreateStreamResponse, true>;
-using TEvDataStreamsDeleteStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDeleteStream, Ydb::DataStreams::V1::DeleteStreamRequest, Ydb::DataStreams::V1::DeleteStreamResponse, true>;
-using TEvDataStreamsDescribeStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStream, Ydb::DataStreams::V1::DescribeStreamRequest, Ydb::DataStreams::V1::DescribeStreamResponse, true>;
-using TEvDataStreamsRegisterStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsRegisterStreamConsumer, Ydb::DataStreams::V1::RegisterStreamConsumerRequest, Ydb::DataStreams::V1::RegisterStreamConsumerResponse, true>;
-using TEvDataStreamsDeregisterStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDeregisterStreamConsumer, Ydb::DataStreams::V1::DeregisterStreamConsumerRequest, Ydb::DataStreams::V1::DeregisterStreamConsumerResponse, true>;
-using TEvDataStreamsDescribeStreamConsumerRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStreamConsumer, Ydb::DataStreams::V1::DescribeStreamConsumerRequest, Ydb::DataStreams::V1::DescribeStreamConsumerResponse, true>;
-using TEvDataStreamsPutRecordRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsPutRecord, Ydb::DataStreams::V1::PutRecordRequest, Ydb::DataStreams::V1::PutRecordResponse, true>;
-using TEvDataStreamsListStreamsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListStreams, Ydb::DataStreams::V1::ListStreamsRequest, Ydb::DataStreams::V1::ListStreamsResponse, true>;
-using TEvDataStreamsListShardsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListShards, Ydb::DataStreams::V1::ListShardsRequest, Ydb::DataStreams::V1::ListShardsResponse, true>;
-using TEvDataStreamsPutRecordsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsPutRecords, Ydb::DataStreams::V1::PutRecordsRequest, Ydb::DataStreams::V1::PutRecordsResponse, true>;
-using TEvDataStreamsGetRecordsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsGetRecords, Ydb::DataStreams::V1::GetRecordsRequest, Ydb::DataStreams::V1::GetRecordsResponse, true>;
-using TEvDataStreamsGetShardIteratorRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsGetShardIterator, Ydb::DataStreams::V1::GetShardIteratorRequest, Ydb::DataStreams::V1::GetShardIteratorResponse, true>;
-using TEvDataStreamsSubscribeToShardRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSubscribeToShard, Ydb::DataStreams::V1::SubscribeToShardRequest, Ydb::DataStreams::V1::SubscribeToShardResponse, true>;
-using TEvDataStreamsDescribeLimitsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeLimits, Ydb::DataStreams::V1::DescribeLimitsRequest, Ydb::DataStreams::V1::DescribeLimitsResponse, true>;
-using TEvDataStreamsDescribeStreamSummaryRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDescribeStreamSummary, Ydb::DataStreams::V1::DescribeStreamSummaryRequest, Ydb::DataStreams::V1::DescribeStreamSummaryResponse, true>;
-using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDecreaseStreamRetentionPeriod, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse, true>;
-using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsIncreaseStreamRetentionPeriod, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse, true>;
-using TEvDataStreamsUpdateShardCountRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsUpdateShardCount, Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse, true>;
-using TEvDataStreamsUpdateStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsUpdateStream, Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse, true>;
-using TEvDataStreamsSetWriteQuotaRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSetWriteQuota, Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse, true>;
-using TEvDataStreamsListStreamConsumersRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListStreamConsumers, Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse, true>;
-using TEvDataStreamsAddTagsToStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsAddTagsToStream, Ydb::DataStreams::V1::AddTagsToStreamRequest, Ydb::DataStreams::V1::AddTagsToStreamResponse, true>;
-using TEvDataStreamsDisableEnhancedMonitoringRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsDisableEnhancedMonitoring, Ydb::DataStreams::V1::DisableEnhancedMonitoringRequest, Ydb::DataStreams::V1::DisableEnhancedMonitoringResponse, true>;
-using TEvDataStreamsEnableEnhancedMonitoringRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsEnableEnhancedMonitoring, Ydb::DataStreams::V1::EnableEnhancedMonitoringRequest, Ydb::DataStreams::V1::EnableEnhancedMonitoringResponse, true>;
-using TEvDataStreamsListTagsForStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsListTagsForStream, Ydb::DataStreams::V1::ListTagsForStreamRequest, Ydb::DataStreams::V1::ListTagsForStreamResponse, true>;
-using TEvDataStreamsMergeShardsRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsMergeShards, Ydb::DataStreams::V1::MergeShardsRequest, Ydb::DataStreams::V1::MergeShardsResponse, true>;
-using TEvDataStreamsRemoveTagsFromStreamRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsRemoveTagsFromStream, Ydb::DataStreams::V1::RemoveTagsFromStreamRequest, Ydb::DataStreams::V1::RemoveTagsFromStreamResponse, true>;
-using TEvDataStreamsSplitShardRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsSplitShard, Ydb::DataStreams::V1::SplitShardRequest, Ydb::DataStreams::V1::SplitShardResponse, true>;
-using TEvDataStreamsStartStreamEncryptionRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsStartStreamEncryption, Ydb::DataStreams::V1::StartStreamEncryptionRequest, Ydb::DataStreams::V1::StartStreamEncryptionResponse, true>;
-using TEvDataStreamsStopStreamEncryptionRequest = TGRpcRequestWrapper<TRpcServices::EvDataStreamsStopStreamEncryption, Ydb::DataStreams::V1::StopStreamEncryptionRequest, Ydb::DataStreams::V1::StopStreamEncryptionResponse, true>;
-using TEvYandexQueryCreateQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateQuery, YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse, true>;
-using TEvYandexQueryListQueriesRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListQueries, YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse, true>;
-using TEvYandexQueryDescribeQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeQuery, YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse, true>;
-using TEvYandexQueryGetQueryStatusRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryGetQueryStatus, YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse, true>;
-using TEvYandexQueryModifyQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyQuery, YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse, true>;
-using TEvYandexQueryDeleteQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteQuery, YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse, true>;
-using TEvYandexQueryControlQueryRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryControlQuery, YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse, true>;
-using TEvYandexQueryGetResultDataRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryGetResultData, YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse, true>;
-using TEvYandexQueryListJobsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListJobs, YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse, true>;
-using TEvYandexQueryDescribeJobRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeJob, YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse, true>;
-using TEvYandexQueryCreateConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateConnection, YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse, true>;
-using TEvYandexQueryListConnectionsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListConnections, YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse, true>;
-using TEvYandexQueryDescribeConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeConnection, YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse, true>;
-using TEvYandexQueryModifyConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyConnection, YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse, true>;
-using TEvYandexQueryDeleteConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteConnection, YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse, true>;
-using TEvYandexQueryTestConnectionRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryTestConnection, YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse, true>;
-using TEvYandexQueryCreateBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryCreateBinding, YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse, true>;
-using TEvYandexQueryListBindingsRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryListBindings, YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse, true>;
-using TEvYandexQueryDescribeBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDescribeBinding, YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse, true>;
-using TEvYandexQueryModifyBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryModifyBinding, YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse, true>;
-using TEvYandexQueryDeleteBindingRequest = TGRpcRequestWrapper<TRpcServices::EvYandexQueryDeleteBinding, YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse, true>;
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_yq.cpp b/ydb/core/grpc_services/rpc_yq.cpp
index 6e832ef5c7..d515056c3e 100644
--- a/ydb/core/grpc_services/rpc_yq.cpp
+++ b/ydb/core/grpc_services/rpc_yq.cpp
@@ -1,10 +1,7 @@
-#include "grpc_request_proxy.h"
-#include "grpc_request_check_actor.h"
-
-#include "rpc_calls.h"
#include "rpc_common.h"
#include "rpc_deferrable.h"
+#include <ydb/core/grpc_services/service_yq.h>
#include <ydb/core/yq/libs/audit/events/events.h>
#include <ydb/core/yq/libs/audit/yq_audit_service.h>
#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h>
@@ -23,11 +20,21 @@ namespace NGRpcService {
using namespace Ydb;
-template<class TEvRequest, class TEvImplRequest, class TEvImplResponse, class TResult>
-class TYandexQueryRPC : public TRpcOperationRequestActor<TYandexQueryRPC<TEvRequest, TEvImplRequest, TEvImplResponse, TResult>, TEvRequest> {
-private:
- const TVector<TString>& Sids;
+template <typename RpcRequestType, typename EvRequestType, typename EvResponseType>
+class TYandexQueryRequestRPC : public TRpcOperationRequestActor<
+ TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> {
+
+public:
+ using TBase = TRpcOperationRequestActor<
+ TYandexQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType>,
+ RpcRequestType>;
+ using TBase::Become;
+ using TBase::Send;
+ using TBase::PassAway;
+ using TBase::Request_;
+ using TBase::GetProtoRequest;
+private:
TString Token;
TString FolderId;
TString User;
@@ -36,68 +43,58 @@ private:
TString RequestId;
public:
- using TBase = TRpcOperationRequestActor<TYandexQueryRPC<TEvRequest, TEvImplRequest, TEvImplResponse, TResult>, TEvRequest>;
- using TBase::TBase;
- using TBase::GetProtoRequest;
- using TBase::Request_;
- using TBase::PassAway;
- using TBase::Send;
- using TBase::Become;
-
- TYandexQueryRPC(IRequestOpCtx* request, const TVector<TString>& sids)
- : TBase(request)
- , Sids(sids)
- { }
+ TYandexQueryRequestRPC(IRequestOpCtx* request)
+ : TBase(request) {}
void Bootstrap() {
- auto requestQuery = dynamic_cast<TEvRequest*>(Request_.get());
- Y_VERIFY(requestQuery);
+ auto requestCtx = Request_.get();
+
+ auto request = dynamic_cast<RpcRequestType*>(requestCtx);
+ Y_VERIFY(request);
+
+ auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx);
+ Y_VERIFY(proxyCtx);
- PeerName = requestQuery->GetPeerName();
- UserAgent = requestQuery->GetPeerMetaValues("user-agent").GetOrElse("empty");
- RequestId = requestQuery->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString());
+ PeerName = Request_->GetPeerName();
+ UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty");
+ RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString());
- TMaybe<TString> authToken = requestQuery->GetYdbToken();
+ TMaybe<TString> authToken = proxyCtx->GetYdbToken();
if (!authToken) {
- Request_->RaiseIssue(NYql::TIssue("token is empty"));
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ ReplyWithStatus("token is empty", StatusIds::BAD_REQUEST);
return;
}
Token = *authToken;
- const TString scope = requestQuery->GetPeerMetaValues("x-yq-scope").GetOrElse("");
+ const TString scope = Request_->GetPeerMetaValues("x-yq-scope").GetOrElse("");
if (!scope.StartsWith("yandexcloud://")) {
- Request_->RaiseIssue(NYql::TIssue("x-yq-scope should start with yandexcloud:// but got " + scope));
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ ReplyWithStatus("x-yq-scope should start with yandexcloud:// but got " + scope, StatusIds::BAD_REQUEST);
return;
}
const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty();
- if (path.size() != 2 && path.size() != 3) { // todo: do not check against 3, backward compatibility
- Request_->RaiseIssue(NYql::TIssue("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope));
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ if (path.size() != 2) {
+ ReplyWithStatus("x-yq-scope format is invalid. Must be yandexcloud://folder_id, but got " + scope, StatusIds::BAD_REQUEST);
return;
}
FolderId = path.back();
if (!FolderId) {
- Request_->RaiseIssue(NYql::TIssue("folder id is empty"));
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ ReplyWithStatus("folder id is empty", StatusIds::BAD_REQUEST);
return;
}
if (FolderId.length() > 1024) {
- Request_->RaiseIssue(NYql::TIssue("folder id length greater than 1024 characters: " + FolderId));
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ ReplyWithStatus("folder id length greater than 1024 characters: " + FolderId, StatusIds::BAD_REQUEST);
return;
}
- const TString& internalToken = requestQuery->GetInternalToken();
+ const TString& internalToken = proxyCtx->GetInternalToken();
TVector<TString> permissions;
if (internalToken) {
NACLib::TUserToken userToken(internalToken);
User = userToken.GetUserSID();
- for (const auto& sid: Sids) {
+ for (const auto& sid: request->Sids) {
if (userToken.IsExist(sid)) {
permissions.push_back(sid);
}
@@ -105,40 +102,45 @@ public:
}
if (!User) {
- Request_->RaiseIssue(NYql::TIssue("Authorization error. Permission denied"));
- ReplyWithResult(StatusIds::UNAUTHORIZED);
+ ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED);
return;
}
- auto request = std::make_unique<TEvImplRequest>(FolderId, *GetProtoRequest(), User, Token, permissions);
- Send(NYq::ControlPlaneProxyActorId(), request.release());
-
- Become(&TYandexQueryRPC::StateFunc);
+ const auto req = GetProtoRequest();
+ auto ev = MakeHolder<EvRequestType>(FolderId, *req, User, Token, permissions);
+ Send(NYq::ControlPlaneProxyActorId(), ev.Release());
+ Become(&TYandexQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc);
}
+ void ReplyWithStatus(const TString& issueMessage, StatusIds::StatusCode status) {
+ Request_->RaiseIssue(NYql::TIssue(issueMessage));
+ Request_->ReplyWithYdbStatus(status);
+ PassAway();
+ }
+
private:
STRICT_STFUNC(StateFunc,
- hFunc(TEvImplResponse, Handler);
+ hFunc(EvResponseType, Handle);
)
- template<typename T>
- void ProcessResponse(const T& response) {
+ template <typename TResponse, typename TReq>
+ void SendResponse(const TResponse& response, TReq& req) {
if (response.Issues) {
- Request_->RaiseIssues(response.Issues);
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ req.RaiseIssues(response.Issues);
+ req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST);
} else {
- ReplyWithResult(StatusIds::SUCCESS, response.Result);
- }
+ req.SendResult(response.Result, StatusIds::SUCCESS);
+ }
}
- template<typename T> requires requires (T t) { t.AuditDetails; }
- void ProcessResponse(const T& response) {
+ template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; }
+ void SendResponse(const TResponse& response, TReq& req) {
if (response.Issues) {
- Request_->RaiseIssues(response.Issues);
- ReplyWithResult(StatusIds::BAD_REQUEST);
+ req.RaiseIssues(response.Issues);
+ req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST);
} else {
- ReplyWithResult(StatusIds::SUCCESS, response.Result);
- }
+ req.SendResult(response.Result, StatusIds::SUCCESS);
+ }
NYq::TEvAuditService::TExtraInfo extraInfo{
.Token = Token,
@@ -156,719 +158,199 @@ private:
response.AuditDetails));
}
- void Handler(typename TEvImplResponse::TPtr& ev) {
- const auto& response = *ev->Get();
- ProcessResponse(response);
- }
-
- void ReplyWithResult(StatusIds::StatusCode status) {
- Request_->ReplyWithYdbStatus(status);
- PassAway();
- }
-
- void ReplyWithResult(StatusIds::StatusCode status, const TResult& result) {
- Request_->SendResult(result, status);
+ void Handle(typename EvResponseType::TPtr& ev) {
+ SendResponse(*ev->Get(), *Request_);
PassAway();
}
};
-void TGRpcRequestProxy::Handle(TEvYandexQueryCreateQueryRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.create@as",
- "yq.queries.invoke@as",
- "yq.connections.use@as",
- "yq.bindings.use@as",
- "yq.resources.managePublic@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateQueryRequest,
- NYq::TEvControlPlaneProxy::TEvCreateQueryRequest,
- NYq::TEvControlPlaneProxy::TEvCreateQueryResponse,
- YandexQuery::CreateQueryResult>(ev->Release().Release(), permissions));
-}
-
-void TGRpcRequestProxy::Handle(TEvYandexQueryListQueriesRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryListQueriesRequest,
- NYq::TEvControlPlaneProxy::TEvListQueriesRequest,
- NYq::TEvControlPlaneProxy::TEvListQueriesResponse,
- YandexQuery::ListQueriesResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryCreateQueryRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::CreateQueryRequest, YandexQuery::CreateQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateQueryResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeQueryRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.get@as",
- "yq.queries.viewAst@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeQueryRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse,
- YandexQuery::DescribeQueryResult>(ev->Release().Release(), permissions));
+void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryCreateQueryRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryGetQueryStatusRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.getStatus@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryGetQueryStatusRequest,
- NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest,
- NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse,
- YandexQuery::GetQueryStatusResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryListQueriesRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ListQueriesRequest, YandexQuery::ListQueriesResponse>,
+ NYq::TEvControlPlaneProxy::TEvListQueriesRequest,
+ NYq::TEvControlPlaneProxy::TEvListQueriesResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryModifyQueryRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.update@as",
- "yq.queries.invoke@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as",
- "yq.connections.use@as",
- "yq.bindings.use@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyQueryRequest,
- NYq::TEvControlPlaneProxy::TEvModifyQueryRequest,
- NYq::TEvControlPlaneProxy::TEvModifyQueryResponse,
- YandexQuery::ModifyQueryResult>(ev->Release().Release(), permissions));
+void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryListQueriesRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteQueryRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.delete@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteQueryRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse,
- YandexQuery::DeleteQueryResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryDescribeQueryRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DescribeQueryRequest, YandexQuery::DescribeQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryControlQueryRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.control@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryControlQueryRequest,
- NYq::TEvControlPlaneProxy::TEvControlQueryRequest,
- NYq::TEvControlPlaneProxy::TEvControlQueryResponse,
- YandexQuery::ControlQueryResult>(ev->Release().Release(), permissions));
+void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDescribeQueryRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryGetResultDataRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.queries.getData@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryGetResultDataRequest,
- NYq::TEvControlPlaneProxy::TEvGetResultDataRequest,
- NYq::TEvControlPlaneProxy::TEvGetResultDataResponse,
- YandexQuery::GetResultDataResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryGetQueryStatusRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::GetQueryStatusRequest, YandexQuery::GetQueryStatusResponse>,
+ NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest,
+ NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryListJobsRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.jobs.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryListJobsRequest,
- NYq::TEvControlPlaneProxy::TEvListJobsRequest,
- NYq::TEvControlPlaneProxy::TEvListJobsResponse,
- YandexQuery::ListJobsResult>(ev->Release().Release(), permissions));
+void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryGetQueryStatusRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeJobRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.jobs.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeJobRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeJobRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeJobResponse,
- YandexQuery::DescribeJobResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryModifyQueryRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ModifyQueryRequest, YandexQuery::ModifyQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyQueryResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryCreateConnectionRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.create@as",
- "yq.resources.managePublic@as",
-
- // service account permissions
- "iam.serviceAccounts.use@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse,
- YandexQuery::CreateConnectionResult>(ev->Release().Release(), permissions));
+void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryModifyQueryRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryListConnectionsRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryListConnectionsRequest,
- NYq::TEvControlPlaneProxy::TEvListConnectionsRequest,
- NYq::TEvControlPlaneProxy::TEvListConnectionsResponse,
- YandexQuery::ListConnectionsResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryDeleteQueryRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DeleteQueryRequest, YandexQuery::DeleteQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeConnectionRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse,
- YandexQuery::DescribeConnectionResult>(ev->Release().Release(), permissions));
+void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDeleteQueryRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryModifyConnectionRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.update@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as",
-
- // service account permissions
- "iam.serviceAccounts.use@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse,
- YandexQuery::ModifyConnectionResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryControlQueryRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ControlQueryRequest, YandexQuery::ControlQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvControlQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvControlQueryResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteConnectionRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.delete@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse,
- YandexQuery::DeleteConnectionResult>(ev->Release().Release(), permissions));
+void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryControlQueryRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryTestConnectionRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.connections.create@as",
-
- // service account permissions
- "iam.serviceAccounts.use@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryTestConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvTestConnectionRequest,
- NYq::TEvControlPlaneProxy::TEvTestConnectionResponse,
- YandexQuery::TestConnectionResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::GetResultDataRequest, YandexQuery::GetResultDataResponse>,
+ NYq::TEvControlPlaneProxy::TEvGetResultDataRequest,
+ NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryCreateBindingRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.bindings.create@as",
- "yq.resources.managePublic@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryCreateBindingRequest,
- NYq::TEvControlPlaneProxy::TEvCreateBindingRequest,
- NYq::TEvControlPlaneProxy::TEvCreateBindingResponse,
- YandexQuery::CreateBindingResult>(ev->Release().Release(), permissions));
+void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryListBindingsRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.bindings.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryListBindingsRequest,
- NYq::TEvControlPlaneProxy::TEvListBindingsRequest,
- NYq::TEvControlPlaneProxy::TEvListBindingsResponse,
- YandexQuery::ListBindingsResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryListJobsRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ListJobsRequest, YandexQuery::ListJobsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListJobsRequest,
+ NYq::TEvControlPlaneProxy::TEvListJobsResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryDescribeBindingRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.bindings.get@as",
- "yq.resources.viewPublic@as",
- "yq.resources.viewPrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDescribeBindingRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest,
- NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse,
- YandexQuery::DescribeBindingResult>(ev->Release().Release(), permissions));
+void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release()));
}
-void TGRpcRequestProxy::Handle(TEvYandexQueryModifyBindingRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.bindings.update@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryModifyBindingRequest,
- NYq::TEvControlPlaneProxy::TEvModifyBindingRequest,
- NYq::TEvControlPlaneProxy::TEvModifyBindingResponse,
- YandexQuery::ModifyBindingResult>(ev->Release().Release(), permissions));
-}
+using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DescribeJobRequest, YandexQuery::DescribeJobResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeJobRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>;
-void TGRpcRequestProxy::Handle(TEvYandexQueryDeleteBindingRequest::TPtr& ev, const TActorContext& ctx) {
- static const TVector<TString> permissions = {
- // folder permissions
- "yq.bindings.delete@as",
- "yq.resources.managePublic@as",
- "yq.resources.managePrivate@as"
- };
- ctx.Register(new TYandexQueryRPC<TEvYandexQueryDeleteBindingRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest,
- NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse,
- YandexQuery::DeleteBindingResult>(ev->Release().Release(), permissions));
+void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release()));
}
-template<typename T>
-TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const T&)
-{
- return {};
-}
+using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::CreateConnectionRequest, YandexQuery::CreateConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>;
-template<>
-TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryCreateConnectionRequest& request)
-{
- TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest());
- if (serviceAccountId) {
- return {{
- {"iam.serviceAccounts.use"},
- {
- {"service_account_id", serviceAccountId},
- {"database_id", "db"}
- }
- }};
- }
- return {};
+void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release()));
}
-template<>
-TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryModifyConnectionRequest& request)
-{
- TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest());
- if (serviceAccountId) {
- return {{
- {"iam.serviceAccounts.use"},
- {
- {"service_account_id", serviceAccountId},
- {"database_id", "db"}
- }
- }};
- }
- return {};
-}
+using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ListConnectionsRequest, YandexQuery::ListConnectionsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListConnectionsRequest,
+ NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>;
-template<>
-TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> GetAdditionalEntries(const TEvYandexQueryTestConnectionRequest& request)
-{
- TString serviceAccountId = NYq::ExtractServiceAccountId(*request.GetProtoRequest());
- if (serviceAccountId) {
- return {{
- {"iam.serviceAccounts.use"},
- {
- {"service_account_id", serviceAccountId},
- {"database_id", "db"}
- }
- }};
- }
- return {};
+void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release()));
}
-template <typename TEvent>
-void TGrpcRequestCheckActor<TEvent>::InitializeAttributesFromYandexQuery(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- CheckedDatabaseName_ = CanonizePath(schemeData.GetPath());
- const TString scope = Request_->Get()->GetPeerMetaValues("x-yq-scope").GetOrElse("");
- if (scope.StartsWith("yandexcloud://")) {
- const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty();
- if (path.size() == 2 || path.size() == 3) {
- const TString& folderId = path.back();
- TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{
- GetPermissions(),
- {
- {"folder_id", folderId},
- {"database_id", "db"}
- }
- }};
+using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DescribeConnectionRequest, YandexQuery::DescribeConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>;
- const auto& request = *Request_->Get();
- const auto additionalEntries = GetAdditionalEntries(request);
- entries.insert(entries.end(), additionalEntries.begin(), additionalEntries.end());
- TBase::SetEntries(entries);
- }
- }
+void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.create",
- "yq.queries.invoke",
- "yq.connections.use",
- "yq.bindings.use",
- "yq.resources.managePublic"
- };
- return permissions;
-}
+using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ModifyConnectionRequest, YandexQuery::ModifyConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>;
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
+void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.get",
- "yq.queries.viewAst",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
+using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DeleteConnectionRequest, YandexQuery::DeleteConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>;
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "yq.queries.getStatus",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
+void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.update",
- "yq.queries.invoke",
- "yq.resources.managePublic",
- "yq.resources.managePrivate",
- "yq.connections.use",
- "yq.bindings.use"
- };
- return permissions;
-}
+using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::TestConnectionRequest, YandexQuery::TestConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvTestConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>;
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.delete",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
+void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.control",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
-}
+using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::CreateBindingRequest, YandexQuery::CreateBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>;
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.queries.getData",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
+void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.jobs.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
+using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ListBindingsRequest, YandexQuery::ListBindingsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListBindingsRequest,
+ NYq::TEvControlPlaneProxy::TEvListBindingsResponse>;
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.jobs.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
+void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release()));
}
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.connections.create",
- "yq.resources.managePublic"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.connections.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.connections.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.connections.update",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.connections.delete",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "yq.connections.create"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.bindings.create",
- "yq.resources.managePublic"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.bindings.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.bindings.get",
- "yq.resources.viewPublic",
- "yq.resources.viewPrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.bindings.update",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
-}
-
-template <>
-const TVector<TString>& TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::GetPermissions() {
- static const TVector<TString> permissions = {
- "ydb.tables.list", // TODO: delete after enabling permission validation
- "yq.bindings.delete",
- "yq.resources.managePublic",
- "yq.resources.managePrivate"
- };
- return permissions;
-}
+using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DescribeBindingRequest, YandexQuery::DescribeBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>;
-// yq behavior
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
+void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release()));
}
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListQueriesRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryGetQueryStatusRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
+using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::ModifyBindingRequest, YandexQuery::ModifyBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>;
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
+void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release()));
}
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryControlQueryRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryGetResultDataRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListJobsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeJobRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListConnectionsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryTestConnectionRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryCreateBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryListBindingsRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDescribeBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
-
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryModifyBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
-}
+using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC<
+ TGrpcYqRequestOperationCall<YandexQuery::DeleteBindingRequest, YandexQuery::DeleteBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>;
-template <>
-void TGrpcRequestCheckActor<TEvYandexQueryDeleteBindingRequest>::InitializeAttributes(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData) {
- InitializeAttributesFromYandexQuery(schemeData);
+void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release()));
}
} // namespace NGRpcService
diff --git a/ydb/core/grpc_services/service_datastreams.h b/ydb/core/grpc_services/service_datastreams.h
new file mode 100644
index 0000000000..7d8488c30e
--- /dev/null
+++ b/ydb/core/grpc_services/service_datastreams.h
@@ -0,0 +1,46 @@
+#pragma once
+#include <memory>
+
+namespace NActors {
+struct TActorId;
+}
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoDataStreamsCreateStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDeleteStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDescribeStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsPutRecordRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsRegisterStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDeregisterStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDescribeStreamConsumerRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsListStreamsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsListShardsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsPutRecordsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsGetRecordsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsGetShardIteratorRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsSubscribeToShardRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDescribeLimitsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDescribeStreamSummaryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsUpdateShardCountRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsListStreamConsumersRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsAddTagsToStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsDisableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsEnableEnhancedMonitoringRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsListTagsForStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsUpdateStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsSetWriteQuotaRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsMergeShardsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsRemoveTagsFromStreamRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsSplitShardRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsStartStreamEncryptionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+void DoDataStreamsStopStreamEncryptionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&);
+
+}
+}
diff --git a/ydb/core/grpc_services/service_yq.h b/ydb/core/grpc_services/service_yq.h
new file mode 100644
index 0000000000..8a9f96cdf2
--- /dev/null
+++ b/ydb/core/grpc_services/service_yq.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include <algorithm>
+#include <memory>
+
+#include <ydb/core/base/ticket_parser.h>
+#include <ydb/core/yq/libs/control_plane_proxy/utils.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+template <typename TReq, typename TResp>
+class TGrpcYqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> {
+
+public:
+ using TBase = TGrpcRequestOperationCall<TReq, TResp>;
+ using TBase::GetProtoRequest;
+ using TBase::GetPeerMetaValues;
+
+ const TVector<TString>& Permissions;
+ TVector<TString> Sids;
+
+ TGrpcYqRequestOperationCall(NGrpc::IRequestContextBase* ctx,
+ void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&),
+ const TVector<TString>& permissions)
+ : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) {
+ }
+
+ bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override {
+
+ const TString scope = GetPeerMetaValues("x-yq-scope").GetOrElse("");
+ if (scope.StartsWith("yandexcloud://")) {
+ const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty();
+ if (path.size() == 2) {
+ const TString& folderId = path.back();
+ TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{
+ Permissions,
+ {
+ {"folder_id", folderId},
+ {"database_id", "db"}
+ }
+ }};
+ std::transform(Permissions.begin(), Permissions.end(), std::back_inserter(Sids),
+ [](const TString& s) -> TString { return s + "@as"; });
+
+ auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest());
+ if (serviceAccountId) {
+ entries.push_back({
+ {"iam.serviceAccounts.use"},
+ {
+ {"service_account_id", serviceAccountId},
+ {"database_id", "db"}
+ }});
+ Sids.push_back("iam.serviceAccounts.use@as");
+ }
+
+ iface->SetEntries(entries);
+ return true;
+ }
+ }
+
+ return false;
+ }
+};
+
+void DoYandexQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/core/http_proxy/auth_factory.cpp b/ydb/core/http_proxy/auth_factory.cpp
index 3ff3e3055e..03f315cad3 100644
--- a/ydb/core/http_proxy/auth_factory.cpp
+++ b/ydb/core/http_proxy/auth_factory.cpp
@@ -2,7 +2,6 @@
#include "http_service.h"
#include "http_req.h"
#include "metrics_actor.h"
-#include "driver_cache_actor.h"
#include "discovery_actor.h"
#include <library/cpp/actors/http/http_proxy.h>
@@ -35,12 +34,7 @@ void TAuthFactory::Initialize(
const NYdb::TCredentialsProviderPtr credentialsProvider = NYdb::CreateInsecureCredentialsProviderFactory()->CreateProvider();
- IActor* actor = NKikimr::NHttpProxy::CreateStaticDiscoveryActor(grpcPort);
- localServices.push_back(std::pair<TActorId, TActorSetupCmd>(
- NKikimr::NHttpProxy::MakeTenantDiscoveryID(),
- TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId)));
-
- actor = NKikimr::NHttpProxy::CreateMetricsActor(NKikimr::NHttpProxy::TMetricsSettings{appData.Counters->GetSubgroup("counters", "http_proxy")});
+ auto actor = NKikimr::NHttpProxy::CreateMetricsActor(NKikimr::NHttpProxy::TMetricsSettings{appData.Counters->GetSubgroup("counters", "http_proxy")});
localServices.push_back(std::pair<TActorId, TActorSetupCmd>(
NKikimr::NHttpProxy::MakeMetricsServiceID(),
TActorSetupCmd(actor, TMailboxType::HTSwap, appData.UserPoolId)));
diff --git a/ydb/core/http_proxy/discovery_actor.cpp b/ydb/core/http_proxy/discovery_actor.cpp
index 7e85f4700c..674ed7af79 100644
--- a/ydb/core/http_proxy/discovery_actor.cpp
+++ b/ydb/core/http_proxy/discovery_actor.cpp
@@ -1,4 +1,3 @@
-#include "driver_cache_actor.h"
#include "events.h"
#include "discovery_actor.h"
diff --git a/ydb/core/http_proxy/driver_cache_actor.cpp b/ydb/core/http_proxy/driver_cache_actor.cpp
index 71f4799ace..e69de29bb2 100644
--- a/ydb/core/http_proxy/driver_cache_actor.cpp
+++ b/ydb/core/http_proxy/driver_cache_actor.cpp
@@ -1,80 +0,0 @@
-#include "driver_cache_actor.h"
-#include "events.h"
-#include "http_req.h"
-
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/actors/core/events.h>
-#include <library/cpp/actors/core/hfunc.h>
-#include <library/cpp/actors/http/http_proxy.h>
-#include <library/cpp/cache/cache.h>
-
-#include <util/stream/file.h>
-#include <util/string/builder.h>
-#include <util/string/vector.h>
-#include <util/system/hostname.h>
-
-namespace NKikimr::NHttpProxy {
-
- using namespace NActors;
-
-
- class TStaticDiscoveryActor : public NActors::TActorBootstrapped<TStaticDiscoveryActor> {
- using TBase = NActors::TActorBootstrapped<TStaticDiscoveryActor>;
- public:
- explicit TStaticDiscoveryActor(ui16 port)
- {
- Database.Reset(new TDatabase(TStringBuilder() << FQDNHostName() << ":" << port, "database_id", "", "cloud_id", "folder_id"));
- }
-
- void Bootstrap(const TActorContext& ctx) {
- TBase::Become(&TStaticDiscoveryActor::StateWork);
- Y_UNUSED(ctx);
- }
-
- private:
- STFUNC(StateWork) {
- switch (ev->GetTypeRewrite()) {
- HFunc(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest, Handle);
- }
- }
-
- THolder<TDatabase> Database;
-
- TVector<std::pair<TActorId, TString>> Waiters;
-
- void Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest::TPtr& ev, const TActorContext& ctx);
-
- TStringBuilder LogPrefix() const {
- return TStringBuilder();
- }
-
- void ProcessWaiters(const TActorContext& ctx);
- };
-
-
- void TStaticDiscoveryActor::Handle(TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest::TPtr& ev, const TActorContext& ctx) {
- Waiters.push_back(std::make_pair(ev->Sender, ev->Get()->DatabasePath));
- if (!Database) {
- return;
- }
- ProcessWaiters(ctx);
- }
-
- void TStaticDiscoveryActor::ProcessWaiters(const TActorContext& ctx) {
- Y_VERIFY(Database);
- for (auto& waiter : Waiters) {
- auto result = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointResult>();
- auto database = waiter.second;
- result->DatabaseInfo = MakeHolder<TDatabase>(*Database);
- result->DatabaseInfo->Path = database;
- result->Status = NYdb::EStatus::SUCCESS;
- ctx.Send(waiter.first, std::move(result));
- }
- Waiters.clear();
- }
-
- NActors::IActor* CreateStaticDiscoveryActor(ui16 port) {
- return new TStaticDiscoveryActor{port};
- }
-
-}
diff --git a/ydb/core/http_proxy/driver_cache_actor.h b/ydb/core/http_proxy/driver_cache_actor.h
index e162e6c146..e69de29bb2 100644
--- a/ydb/core/http_proxy/driver_cache_actor.h
+++ b/ydb/core/http_proxy/driver_cache_actor.h
@@ -1,8 +0,0 @@
-#pragma once
-
-#include <library/cpp/actors/core/actor.h>
-
-namespace NKikimr::NHttpProxy {
-
- NActors::IActor* CreateStaticDiscoveryActor(ui16 port);
-} // namespace
diff --git a/ydb/core/http_proxy/events.h b/ydb/core/http_proxy/events.h
index 12eaf2df32..cd32e92c34 100644
--- a/ydb/core/http_proxy/events.h
+++ b/ydb/core/http_proxy/events.h
@@ -117,9 +117,12 @@ namespace NKikimr::NHttpProxy {
TString ServiceAccountId;
TString IamToken;
- TEvToken(const TString& serviceAccountId, const TString& iamToken)
+ TString SerializedUserToken;
+
+ TEvToken(const TString& serviceAccountId, const TString& iamToken, const TString& serializedUserToken = "")
: ServiceAccountId(serviceAccountId)
, IamToken(iamToken)
+ , SerializedUserToken(serializedUserToken)
{}
};
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 6d775c012d..ea0b2f7aa2 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -1,4 +1,3 @@
-#include "driver_cache_actor.h"
#include "events.h"
#include "http_req.h"
#include "auth_factory.h"
@@ -6,14 +5,17 @@
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
#include <ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/common.h>
#include <ydb/core/grpc_caching/cached_grpc_request_actor.h>
+#include <ydb/core/grpc_services/local_rpc/local_rpc.h>
#include <ydb/core/protos/serverless_proxy_config.pb.h>
#include <ydb/services/datastreams/shard_iterator.h>
#include <ydb/services/datastreams/next_token.h>
+#include <ydb/services/datastreams/datastreams_proxy.h>
#include <ydb/core/viewer/json/json.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/naming_conventions/naming_conventions.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/library/http_proxy/authorization/auth_helpers.h>
#include <ydb/library/http_proxy/error/error.h>
@@ -38,7 +40,6 @@
namespace NKikimr::NHttpProxy {
-
using namespace google::protobuf;
using namespace Ydb::DataStreams::V1;
using namespace NYdb::NDataStreams::V1;
@@ -556,7 +557,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
);
}
- template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall>
+ template<class TProtoService, class TProtoRequest, class TProtoResponse, class TProtoResult, class TProtoCall, class TRpcEv>
class THttpRequestProcessor : public IHttpRequestProcessor {
public:
enum TRequestState {
@@ -630,6 +631,8 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
}
void SendYdbDriverRequest(const TActorContext& ctx) {
+ Y_VERIFY(HttpContext.Driver);
+
RequestState = StateAuthorization;
auto request = MakeHolder<TEvServerlessProxy::TEvDiscoverDatabaseEndpointRequest>();
@@ -675,7 +678,24 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
"sending grpc request to '" << HttpContext.DiscoveryEndpoint <<
"' database: '" << HttpContext.DatabaseName <<
"' iam token size: " << HttpContext.IamToken.size());
-
+ if (!HttpContext.Driver) {
+ RpcFuture = NRpcService::DoLocalRpc<TRpcEv>(std::move(Request), HttpContext.DatabaseName, HttpContext.SerializedUserToken, ctx.ActorSystem());
+ RpcFuture.Subscribe([actorId = ctx.SelfID, actorSystem = ctx.ActorSystem()](const NThreading::TFuture<TProtoResponse>& future) {
+ auto& response = future.GetValueSync();
+ auto result = MakeHolder<TEvServerlessProxy::TEvGrpcRequestResult>();
+ Y_VERIFY(response.operation().ready());
+ if (response.operation().status() == Ydb::StatusIds::SUCCESS) {
+ TProtoResult rs;
+ response.operation().result().UnpackTo(&rs);
+ result->Message = MakeHolder<TProtoResult>(rs);
+ }
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(response.operation().issues(), issues);
+ result->Status = MakeHolder<NYdb::TStatus>(NYdb::EStatus(response.operation().status()), std::move(issues));
+ actorSystem->Send(actorId, result.Release());
+ });
+ return;
+ }
Y_VERIFY(Client);
Y_VERIFY(DiscoveryFuture->HasValue());
@@ -703,7 +723,13 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
void HandleToken(TEvServerlessProxy::TEvToken::TPtr& ev, const TActorContext& ctx) {
HttpContext.ServiceAccountId = ev->Get()->ServiceAccountId;
HttpContext.IamToken = ev->Get()->IamToken;
- SendYdbDriverRequest(ctx);
+ HttpContext.SerializedUserToken = ev->Get()->SerializedUserToken;
+
+ if (HttpContext.Driver) {
+ SendYdbDriverRequest(ctx);
+ } else {
+ SendGrpcRequest(ctx);
+ }
}
void HandleError(TEvServerlessProxy::TEvError::TPtr& ev, const TActorContext& ctx) {
@@ -830,8 +856,12 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
"database '" << HttpContext.DatabaseName << "' " <<
"stream '" << ExtractStreamName<TProtoRequest>(Request) << "'");
- if (HttpContext.IamToken.empty()) {
- AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature)));
+ if (HttpContext.IamToken.empty() || !HttpContext.Driver) { //use Signature or no sdk mode - then need to auth anyway
+ if (HttpContext.IamToken.empty() && !Signature) { //Test mode - no driver and no creds
+ SendGrpcRequest(ctx);
+ } else {
+ AuthActor = ctx.Register(AppData(ctx)->DataStreamsAuthFactory->CreateAuthActor(ctx.SelfID, HttpContext, std::move(Signature)));
+ }
} else {
SendYdbDriverRequest(ctx);
}
@@ -849,6 +879,7 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
THttpRequestContext HttpContext;
THolder<NKikimr::NSQS::TAwsRequestSignV4> Signature;
THolder<NThreading::TFuture<TProtoResultWrapper<TProtoResult>>> Future;
+ NThreading::TFuture<TProtoResponse> RpcFuture;
THolder<NThreading::TFuture<void>> DiscoveryFuture;
TProtoCall ProtoCall;
TString Method;
@@ -880,60 +911,37 @@ BuildLabels("", httpContext, "stream.put_records.failed_records_per_second")
void THttpRequestProcessors::Initialize() {
- Name2Processor["PutRecords"] = MakeHolder<THttpRequestProcessor<DataStreamsService, PutRecordsRequest, PutRecordsResponse, PutRecordsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecords)>>("PutRecords", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecords);
- Name2Processor["CreateStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, CreateStreamRequest, CreateStreamResponse, CreateStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncCreateStream)>>("CreateStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncCreateStream);
- Name2Processor["ListStreams"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListStreamsRequest, ListStreamsResponse, ListStreamsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreams)>>("ListStreams", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreams);
- Name2Processor["DeleteStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DeleteStreamRequest, DeleteStreamResponse, DeleteStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeleteStream)>>("DeleteStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeleteStream);
- Name2Processor["DescribeStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamRequest, DescribeStreamResponse, DescribeStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStream)>>("DescribeStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStream);
- Name2Processor["ListShards"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListShardsRequest, ListShardsResponse, ListShardsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListShards)>>("ListShards", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListShards);
- Name2Processor["PutRecord"] = MakeHolder<THttpRequestProcessor<DataStreamsService, PutRecordRequest, PutRecordResponse, PutRecordResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecord)>>("PutRecord", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncPutRecord);
- Name2Processor["GetRecords"] = MakeHolder<THttpRequestProcessor<DataStreamsService, GetRecordsRequest, GetRecordsResponse, GetRecordsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetRecords)>>("GetRecords", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetRecords);
- Name2Processor["GetShardIterator"] = MakeHolder<THttpRequestProcessor<DataStreamsService, GetShardIteratorRequest, GetShardIteratorResponse, GetShardIteratorResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetShardIterator)>>("GetShardIterator", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncGetShardIterator);
- Name2Processor["DescribeLimits"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeLimitsRequest, DescribeLimitsResponse, DescribeLimitsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeLimits)>>("DescribeLimits", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeLimits);
- Name2Processor["DescribeStreamSummary"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamSummaryRequest, DescribeStreamSummaryResponse, DescribeStreamSummaryResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamSummary)>>("DescribeStreamSummary", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamSummary);
- Name2Processor["DecreaseStreamRetentionPeriod"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DecreaseStreamRetentionPeriodRequest, DecreaseStreamRetentionPeriodResponse, DecreaseStreamRetentionPeriodResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDecreaseStreamRetentionPeriod)>>("DecreaseStreamRetentionPeriod", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDecreaseStreamRetentionPeriod);
- Name2Processor["IncreaseStreamRetentionPeriod"] = MakeHolder<THttpRequestProcessor<DataStreamsService, IncreaseStreamRetentionPeriodRequest, IncreaseStreamRetentionPeriodResponse, IncreaseStreamRetentionPeriodResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncIncreaseStreamRetentionPeriod)>>("IncreaseStreamRetentionPeriod", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncIncreaseStreamRetentionPeriod);
- Name2Processor["UpdateShardCount"] = MakeHolder<THttpRequestProcessor<DataStreamsService, UpdateShardCountRequest, UpdateShardCountResponse, UpdateShardCountResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateShardCount)>>("UpdateShardCount", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncUpdateShardCount);
- Name2Processor["RegisterStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, RegisterStreamConsumerRequest, RegisterStreamConsumerResponse, RegisterStreamConsumerResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRegisterStreamConsumer)>>("RegisterStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRegisterStreamConsumer);
- Name2Processor["DeregisterStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DeregisterStreamConsumerRequest, DeregisterStreamConsumerResponse, DeregisterStreamConsumerResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeregisterStreamConsumer)>>("DeregisterStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDeregisterStreamConsumer);
- Name2Processor["DescribeStreamConsumer"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DescribeStreamConsumerRequest, DescribeStreamConsumerResponse, DescribeStreamConsumerResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamConsumer)>>("DescribeStreamConsumer", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDescribeStreamConsumer);
- Name2Processor["ListStreamConsumers"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListStreamConsumersRequest, ListStreamConsumersResponse, ListStreamConsumersResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreamConsumers)>>("ListStreamConsumers", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListStreamConsumers);
- Name2Processor["AddTagsToStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, AddTagsToStreamRequest, AddTagsToStreamResponse, AddTagsToStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncAddTagsToStream)>>("AddTagsToStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncAddTagsToStream);
- Name2Processor["DisableEnhancedMonitoring"] = MakeHolder<THttpRequestProcessor<DataStreamsService, DisableEnhancedMonitoringRequest, DisableEnhancedMonitoringResponse, DisableEnhancedMonitoringResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDisableEnhancedMonitoring)>>("DisableEnhancedMonitoring", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncDisableEnhancedMonitoring);
- Name2Processor["EnableEnhancedMonitoring"] = MakeHolder<THttpRequestProcessor<DataStreamsService, EnableEnhancedMonitoringRequest, EnableEnhancedMonitoringResponse, EnableEnhancedMonitoringResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncEnableEnhancedMonitoring)>>("EnableEnhancedMonitoring", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncEnableEnhancedMonitoring);
- Name2Processor["ListTagsForStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, ListTagsForStreamRequest, ListTagsForStreamResponse, ListTagsForStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListTagsForStream)>>("ListTagsForStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncListTagsForStream);
- Name2Processor["MergeShards"] = MakeHolder<THttpRequestProcessor<DataStreamsService, MergeShardsRequest, MergeShardsResponse, MergeShardsResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncMergeShards)>>("MergeShards", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncMergeShards);
- Name2Processor["RemoveTagsFromStream"] = MakeHolder<THttpRequestProcessor<DataStreamsService, RemoveTagsFromStreamRequest, RemoveTagsFromStreamResponse, RemoveTagsFromStreamResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRemoveTagsFromStream)>>("RemoveTagsFromStream", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncRemoveTagsFromStream);
- Name2Processor["SplitShard"] = MakeHolder<THttpRequestProcessor<DataStreamsService, SplitShardRequest, SplitShardResponse, SplitShardResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncSplitShard)>>("SplitShard", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncSplitShard);
- Name2Processor["StartStreamEncryption"] = MakeHolder<THttpRequestProcessor<DataStreamsService, StartStreamEncryptionRequest, StartStreamEncryptionResponse, StartStreamEncryptionResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStartStreamEncryption)>>("StartStreamEncryption", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStartStreamEncryption);
- Name2Processor["StopStreamEncryption"] = MakeHolder<THttpRequestProcessor<DataStreamsService, StopStreamEncryptionRequest, StopStreamEncryptionResponse, StopStreamEncryptionResult,
- decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStopStreamEncryption)>>("StopStreamEncryption", &Ydb::DataStreams::V1::DataStreamsService::Stub::AsyncStopStreamEncryption);
+ #define DECLARE_PROCESSOR(name) Name2Processor[#name] = MakeHolder<THttpRequestProcessor<DataStreamsService, name##Request, name##Response, name##Result,\
+ decltype(&Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name), NKikimr::NGRpcService::TEvDataStreams##name##Request>> \
+ (#name, &Ydb::DataStreams::V1::DataStreamsService::Stub::Async##name);
+ DECLARE_PROCESSOR(PutRecords);
+ DECLARE_PROCESSOR(CreateStream);
+ DECLARE_PROCESSOR(ListStreams);
+ DECLARE_PROCESSOR(DeleteStream);
+ DECLARE_PROCESSOR(DescribeStream);
+ DECLARE_PROCESSOR(ListShards);
+ DECLARE_PROCESSOR(PutRecord);
+ DECLARE_PROCESSOR(GetRecords);
+ DECLARE_PROCESSOR(GetShardIterator);
+ DECLARE_PROCESSOR(DescribeLimits);
+ DECLARE_PROCESSOR(DescribeStreamSummary);
+ DECLARE_PROCESSOR(DecreaseStreamRetentionPeriod);
+ DECLARE_PROCESSOR(IncreaseStreamRetentionPeriod);
+ DECLARE_PROCESSOR(UpdateShardCount);
+ DECLARE_PROCESSOR(RegisterStreamConsumer);
+ DECLARE_PROCESSOR(DeregisterStreamConsumer);
+ DECLARE_PROCESSOR(DescribeStreamConsumer);
+ DECLARE_PROCESSOR(ListStreamConsumers);
+ DECLARE_PROCESSOR(AddTagsToStream);
+ DECLARE_PROCESSOR(DisableEnhancedMonitoring);
+ DECLARE_PROCESSOR(EnableEnhancedMonitoring);
+ DECLARE_PROCESSOR(ListTagsForStream);
+ DECLARE_PROCESSOR(MergeShards);
+ DECLARE_PROCESSOR(RemoveTagsFromStream);
+ DECLARE_PROCESSOR(SplitShard);
+ DECLARE_PROCESSOR(StartStreamEncryption);
+ DECLARE_PROCESSOR(StopStreamEncryption);
+ #undef DECLARE_PROCESSOR
}
bool THttpRequestProcessors::Execute(const TString& name, THttpRequestContext&& context, THolder<NKikimr::NSQS::TAwsRequestSignV4> signature, const TActorContext& ctx) {
diff --git a/ydb/core/http_proxy/http_req.h b/ydb/core/http_proxy/http_req.h
index 9936e48ea1..9b1f01af3e 100644
--- a/ydb/core/http_proxy/http_req.h
+++ b/ydb/core/http_proxy/http_req.h
@@ -77,6 +77,7 @@ struct THttpRequestContext {
NActors::TActorId Sender;
TString IamToken;
+ TString SerializedUserToken;
const NKikimrConfig::TServerlessProxyConfig& ServiceConfig;
NYdb::TDriver* Driver;
std::shared_ptr<NYdb::ICredentialsProvider> ServiceAccountCredentialsProvider;
diff --git a/ydb/core/http_proxy/http_service.cpp b/ydb/core/http_proxy/http_service.cpp
index ba9e849a18..b1b880d015 100644
--- a/ydb/core/http_proxy/http_service.cpp
+++ b/ydb/core/http_proxy/http_service.cpp
@@ -27,14 +27,16 @@ namespace NKikimr::NHttpProxy {
ServiceAccountCredentialsProvider = cfg.CredentialsProvider;
Processors = MakeHolder<THttpRequestProcessors>();
Processors->Initialize();
- auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1)
- .SetGRpcKeepAlivePermitWithoutCalls(true)
- .SetGRpcKeepAliveTimeout(TDuration::Seconds(90))
- .SetDiscoveryMode(NYdb::EDiscoveryMode::Async);
- if (Config.GetCaCert()) {
- config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll());
+ if (cfg.UseSDK) {
+ auto config = NYdb::TDriverConfig().SetNetworkThreadsNum(1)
+ .SetGRpcKeepAlivePermitWithoutCalls(true)
+ .SetGRpcKeepAliveTimeout(TDuration::Seconds(90))
+ .SetDiscoveryMode(NYdb::EDiscoveryMode::Async);
+ if (Config.GetCaCert()) {
+ config.UseSecureConnection(TFileInput(Config.GetCaCert()).ReadAll());
+ }
+ Driver = MakeHolder<NYdb::TDriver>(std::move(config));
}
- Driver = MakeHolder<NYdb::TDriver>(std::move(config));
}
void Bootstrap(const TActorContext& ctx) {
diff --git a/ydb/core/http_proxy/http_service.h b/ydb/core/http_proxy/http_service.h
index 52f8a36953..3b22d79cfd 100644
--- a/ydb/core/http_proxy/http_service.h
+++ b/ydb/core/http_proxy/http_service.h
@@ -12,6 +12,7 @@ namespace NKikimr::NHttpProxy {
struct THttpProxyConfig {
NKikimrConfig::TServerlessProxyConfig Config;
std::shared_ptr<NYdb::ICredentialsProvider> CredentialsProvider;
+ bool UseSDK = false;
};
NActors::IActor* CreateHttpProxy(const THttpProxyConfig& config);
diff --git a/ydb/core/http_proxy/ya.make b/ydb/core/http_proxy/ya.make
index c7340e13bb..31054b950a 100644
--- a/ydb/core/http_proxy/ya.make
+++ b/ydb/core/http_proxy/ya.make
@@ -4,8 +4,6 @@ LIBRARY()
SRCS(
- driver_cache_actor.cpp
- driver_cache_actor.h
events.h
http_req.cpp
http_req.h
@@ -26,12 +24,15 @@ PEERDIR(
library/cpp/actors/core
ydb/core/base
ydb/core/protos
+ ydb/core/grpc_services/local_rpc
+ ydb/library/yql/public/issue
ydb/library/http_proxy/authorization
ydb/library/http_proxy/error
ydb/library/naming_conventions
ydb/public/sdk/cpp/client/ydb_datastreams
ydb/public/sdk/cpp/client/ydb_persqueue_core
ydb/public/sdk/cpp/client/ydb_persqueue_public/codecs
+ ydb/services/datastreams
)
YQL_LAST_ABI_VERSION()
diff --git a/ydb/core/testlib/service_mocks/access_service_mock.h b/ydb/core/testlib/service_mocks/access_service_mock.h
index f18ac92f0a..ab19356a16 100644
--- a/ydb/core/testlib/service_mocks/access_service_mock.h
+++ b/ydb/core/testlib/service_mocks/access_service_mock.h
@@ -39,6 +39,7 @@ public:
} else {
key = request->iam_token();
}
+
auto it = AuthenticateData.find(key);
if (it != AuthenticateData.end()) {
response->CopyFrom(it->second.Response);
@@ -52,8 +53,9 @@ public:
virtual grpc::Status Authorize(
grpc::ServerContext* ctx,
const yandex::cloud::priv::servicecontrol::v1::AuthorizeRequest* request,
- yandex::cloud::priv::servicecontrol::v1::AuthorizeResponse* response) override {
- const TString& token = request->subject().user_account().id() + "-" + request->permission() + "-" + request->resource_path(0).id();
+ yandex::cloud::priv::servicecontrol::v1::AuthorizeResponse* response) override
+ {
+ const TString& token = request->signature().access_key_id() + request->iam_token() + "-" + request->permission() + "-" + request->resource_path(0).id();
auto it = AuthorizeData.find(token);
if (it != AuthorizeData.end()) {
response->CopyFrom(it->second.Response);
diff --git a/ydb/core/yq/libs/control_plane_proxy/utils.h b/ydb/core/yq/libs/control_plane_proxy/utils.h
index 2a443edd2d..96e901b5f8 100644
--- a/ydb/core/yq/libs/control_plane_proxy/utils.h
+++ b/ydb/core/yq/libs/control_plane_proxy/utils.h
@@ -39,9 +39,17 @@ inline TString ExtractServiceAccountId(const YandexQuery::TestConnectionRequest&
return ExtractServiceAccountIdImpl(c.setting());
}
-template<typename T>
-TString ExtractServiceAccountId(const T& c) {
+inline TString ExtractServiceAccountId(const YandexQuery::CreateConnectionRequest& c) {
+ return ExtractServiceAccountIdImpl(c.content().setting());
+}
+
+inline TString ExtractServiceAccountId(const YandexQuery::ModifyConnectionRequest& c) {
return ExtractServiceAccountIdImpl(c.content().setting());
}
+template<typename T>
+TString ExtractServiceAccountId(const T&) {
+ return {};
+}
+
} // namespace NYq
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index 7ec3cee164..80477524be 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -3,6 +3,7 @@
#include "shard_iterator.h"
#include "next_token.h"
+#include <ydb/core/grpc_services/service_datastreams.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/grpc_services/rpc_deferrable.h>
#include <ydb/core/grpc_services/rpc_scheme_base.h>
@@ -22,14 +23,22 @@ using namespace NKikimrClient;
using grpc::Status;
+
namespace NKikimr::NDataStreams::V1 {
const TString YDS_SERVICE_TYPE = "data-streams";
using namespace NGRpcService;
using namespace NGRpcProxy::V1;
+
namespace {
+ template <class TRequest>
+ const TRequest* GetRequest(NGRpcService::IRequestOpCtx *request)
+ {
+ return dynamic_cast<const TRequest*>(request->GetRequest());
+ }
+
ui32 PartitionWriteSpeedInBytesPerSec(ui32 speedInKbPerSec) {
return speedInKbPerSec == 0 ? 1024 * 1024 : speedInKbPerSec * 1024;
}
@@ -102,10 +111,10 @@ namespace NKikimr::NDataStreams::V1 {
class TCreateStreamActor : public TPQGrpcSchemaBase<TCreateStreamActor, NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest> {
using TBase = TPQGrpcSchemaBase<TCreateStreamActor, TEvDataStreamsCreateStreamRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
- TActorId NewSchemeCache;
public:
- TCreateStreamActor(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest* request, TActorId newSchemeCache);
+ TCreateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TCreateStreamActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -117,11 +126,9 @@ namespace NKikimr::NDataStreams::V1 {
};
- TCreateStreamActor::TCreateStreamActor(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest* request, TActorId newSchemeCache)
- : TBase(request, request->GetProtoRequest()->stream_name())
- , NewSchemeCache(newSchemeCache)
+ TCreateStreamActor::TCreateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
- Y_UNUSED(NewSchemeCache);
}
void TCreateStreamActor::Bootstrap(const NActors::TActorContext& ctx) {
@@ -194,9 +201,10 @@ namespace NKikimr::NDataStreams::V1 {
class TDeleteStreamActor : public TPQGrpcSchemaBase<TDeleteStreamActor, NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest> {
using TBase = TPQGrpcSchemaBase<TDeleteStreamActor, TEvDataStreamsDeleteStreamRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TDeleteStreamActor(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest* request);
+ TDeleteStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TDeleteStreamActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -211,9 +219,9 @@ namespace NKikimr::NDataStreams::V1 {
bool EnforceDeletion;
};
- TDeleteStreamActor::TDeleteStreamActor(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_name())
- , EnforceDeletion{request->GetProtoRequest()->enforce_consumer_deletion()}
+ TDeleteStreamActor::TDeleteStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
+ , EnforceDeletion{GetProtoRequest()->enforce_consumer_deletion()}
{
}
@@ -256,10 +264,10 @@ namespace NKikimr::NDataStreams::V1 {
class TUpdateShardCountActor : public TUpdateSchemeActor<TUpdateShardCountActor, TEvDataStreamsUpdateShardCountRequest> {
using TBase = TUpdateSchemeActor<TUpdateShardCountActor, TEvDataStreamsUpdateShardCountRequest>;
-
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TUpdateShardCountActor(TEvDataStreamsUpdateShardCountRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ TUpdateShardCountActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
}
@@ -296,10 +304,11 @@ namespace NKikimr::NDataStreams::V1 {
class TUpdateStreamActor : public TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest> {
using TBase = TUpdateSchemeActor<TUpdateStreamActor, TEvDataStreamsUpdateStreamRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TUpdateStreamActor(TEvDataStreamsUpdateStreamRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ TUpdateStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
}
@@ -342,10 +351,11 @@ namespace NKikimr::NDataStreams::V1 {
class TSetWriteQuotaActor : public TUpdateSchemeActor<TSetWriteQuotaActor, TEvDataStreamsSetWriteQuotaRequest> {
using TBase = TUpdateSchemeActor<TSetWriteQuotaActor, TEvDataStreamsSetWriteQuotaRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TSetWriteQuotaActor(TEvDataStreamsSetWriteQuotaRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ TSetWriteQuotaActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
}
@@ -384,10 +394,11 @@ namespace NKikimr::NDataStreams::V1 {
template<class TEvProto>
class TSetStreamRetentionPeriodActor : public TUpdateSchemeActor<TSetStreamRetentionPeriodActor<TEvProto>, TEvProto> {
using TBase = TUpdateSchemeActor<TSetStreamRetentionPeriodActor<TEvProto>, TEvProto>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TSetStreamRetentionPeriodActor(TEvProto* request, bool shouldIncrease)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ TSetStreamRetentionPeriodActor(NKikimr::NGRpcService::IRequestOpCtx* request, bool shouldIncrease)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
, ShouldIncrease(shouldIncrease)
{
}
@@ -424,9 +435,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TDescribeStreamActor, TEvDataStreamsDescribeStreamRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TDescribeStreamActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest* request);
+ TDescribeStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TDescribeStreamActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -473,8 +485,8 @@ namespace NKikimr::NDataStreams::V1 {
std::map<ui64, std::pair<ui64, ui64>> StartEndOffsetsPerPartition;
};
- TDescribeStreamActor::TDescribeStreamActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ TDescribeStreamActor::TDescribeStreamActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
}
@@ -594,7 +606,7 @@ namespace NKikimr::NDataStreams::V1 {
using TBase = TRpcSchemeRequestActor<TListStreamsActor, TEvDataStreamsListStreamsRequest>;
public:
- TListStreamsActor(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest* request, TActorId newSchemeCache);
+ TListStreamsActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TListStreamsActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -620,12 +632,10 @@ namespace NKikimr::NDataStreams::V1 {
ui32 RequestsInFlight = 0;
std::vector<std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet>> WaitingList;
std::vector<TString> Topics;
- TActorId NewSchemeCache;
};
- TListStreamsActor::TListStreamsActor(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest* request, TActorId newSchemeCache)
+ TListStreamsActor::TListStreamsActor(NKikimr::NGRpcService::IRequestOpCtx* request)
: TBase(request)
- , NewSchemeCache(newSchemeCache)
{
}
@@ -649,7 +659,7 @@ namespace NKikimr::NDataStreams::V1 {
void TListStreamsActor::SendPendingRequests(const TActorContext& ctx) {
if (RequestsInFlight < MAX_IN_FLIGHT && WaitingList.size() > 0) {
- ctx.Send(NewSchemeCache, WaitingList.back().release());
+ ctx.Send(MakeSchemeCacheID(), WaitingList.back().release());
WaitingList.pop_back();
RequestsInFlight++;
}
@@ -749,9 +759,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TListStreamConsumersActor, TEvDataStreamsListStreamConsumersRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TListStreamConsumersActor(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest* request);
+ TListStreamConsumersActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TListStreamConsumersActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -770,15 +781,15 @@ namespace NKikimr::NDataStreams::V1 {
ui32 MaxResults = DEFAULT_MAX_RESULTS;
TNextToken NextToken;
};
- TListStreamConsumersActor::TListStreamConsumersActor(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest* request)
- : TBase(request, TNextToken(request->GetProtoRequest()->next_token()).IsValid() ?
- TNextToken(request->GetProtoRequest()->next_token()).GetStreamArn() :
- request->GetProtoRequest()->stream_arn())
- , NextToken(request->GetProtoRequest()->next_token())
+ TListStreamConsumersActor::TListStreamConsumersActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, TNextToken(GetRequest<TProtoRequest>(request)->next_token()).IsValid() ?
+ TNextToken(GetRequest<TProtoRequest>(request)->next_token()).GetStreamArn() :
+ GetRequest<TProtoRequest>(request)->stream_arn())
+ , NextToken(GetRequest<TProtoRequest>(request)->next_token())
{
- if (request->GetProtoRequest()->next_token().empty()) {
- StreamArn = request->GetProtoRequest()->stream_arn();
- MaxResults = request->GetProtoRequest()->max_results();
+ if (GetProtoRequest()->next_token().empty()) {
+ StreamArn = GetProtoRequest()->stream_arn();
+ MaxResults = GetProtoRequest()->max_results();
NextToken = TNextToken(StreamArn, 0, MaxResults, TInstant::Now().MilliSeconds());
} else {
StreamArn = NextToken.GetStreamArn();
@@ -871,9 +882,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TUpdateSchemeActor<TRegisterStreamConsumerActor, TEvDataStreamsRegisterStreamConsumerRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request);
+ TRegisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TRegisterStreamConsumerActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -886,9 +898,9 @@ namespace NKikimr::NDataStreams::V1 {
private:
TString ConsumerName;
};
- TRegisterStreamConsumerActor::TRegisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_arn())
- , ConsumerName(request->GetProtoRequest()->consumer_name())
+ TRegisterStreamConsumerActor::TRegisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn())
+ , ConsumerName(GetRequest<TProtoRequest>(request)->consumer_name())
{
}
@@ -948,9 +960,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TUpdateSchemeActor<TDeregisterStreamConsumerActor, TEvDataStreamsDeregisterStreamConsumerRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request);
+ TDeregisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TDeregisterStreamConsumerActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -962,9 +975,9 @@ namespace NKikimr::NDataStreams::V1 {
private:
TString ConsumerName;
};
- TDeregisterStreamConsumerActor::TDeregisterStreamConsumerActor(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest* request)
- : TBase(request, request->GetProtoRequest()->stream_arn())
- , ConsumerName(request->GetProtoRequest()->consumer_name())
+ TDeregisterStreamConsumerActor::TDeregisterStreamConsumerActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_arn())
+ , ConsumerName(GetRequest<TProtoRequest>(request)->consumer_name())
{
}
@@ -998,9 +1011,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TGetShardIteratorActor, TEvDataStreamsGetShardIteratorRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TGetShardIteratorActor(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest* request, NActors::TActorId newSchemeCache);
+ TGetShardIteratorActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TGetShardIteratorActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -1014,7 +1028,6 @@ namespace NKikimr::NDataStreams::V1 {
void SendResponse(const TActorContext& ctx, const TShardIterator& shardIt);
std::optional<ui32> SequenceNumberToInt(const TString& sequenceNumberStr);
- TActorId NewSchemeCache;
TString StreamName;
TString ShardId;
TIteratorType IteratorType;
@@ -1022,12 +1035,11 @@ namespace NKikimr::NDataStreams::V1 {
ui64 ReadTimestampMs;
};
- TGetShardIteratorActor::TGetShardIteratorActor(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest* request, NActors::TActorId newSchemeCache)
- : TBase(request, request->GetProtoRequest()->stream_name())
- , NewSchemeCache(std::move(newSchemeCache))
- , StreamName{request->GetProtoRequest()->stream_name()}
- , ShardId{request->GetProtoRequest()->shard_id()}
- , IteratorType{request->GetProtoRequest()->shard_iterator_type()}
+ TGetShardIteratorActor::TGetShardIteratorActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
+ , StreamName{GetRequest<TProtoRequest>(request)->stream_name()}
+ , ShardId{GetRequest<TProtoRequest>(request)->shard_id()}
+ , IteratorType{GetRequest<TProtoRequest>(request)->shard_iterator_type()}
, SequenceNumber{0}
, ReadTimestampMs{0}
{
@@ -1140,12 +1152,13 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TGetRecordsActor, TEvDataStreamsGetRecordsRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
static constexpr ui32 READ_TIMEOUT_MS = 150;
static constexpr i32 MAX_LIMIT = 10000;
public:
- TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request, const TActorId& newSchemeCache);
+ TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TGetRecordsActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -1166,20 +1179,17 @@ namespace NKikimr::NDataStreams::V1 {
TString StreamName;
ui64 TabletId;
i32 Limit;
- TActorId NewSchemeCache;
TActorId PipeClient;
};
- TGetRecordsActor::TGetRecordsActor(TEvDataStreamsGetRecordsRequest* request,
- const TActorId& newSchemeCache)
- : TBase(request, TShardIterator(request->GetProtoRequest()->shard_iterator()).IsValid()
- ? TShardIterator(request->GetProtoRequest()->shard_iterator()).GetStreamName()
+ TGetRecordsActor::TGetRecordsActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).IsValid()
+ ? TShardIterator(GetRequest<TProtoRequest>(request)->shard_iterator()).GetStreamName()
: "undefined")
- , ShardIterator{request->GetProtoRequest()->shard_iterator()}
+ , ShardIterator{GetRequest<TProtoRequest>(request)->shard_iterator()}
, StreamName{ShardIterator.IsValid() ? ShardIterator.GetStreamName() : "undefined"}
, TabletId{0}
- , Limit{request->GetProtoRequest()->limit()}
- , NewSchemeCache{newSchemeCache}
+ , Limit{GetRequest<TProtoRequest>(request)->limit()}
{
}
@@ -1363,9 +1373,10 @@ namespace NKikimr::NDataStreams::V1 {
, public TCdcStreamCompatible
{
using TBase = TPQGrpcSchemaBase<TListShardsActor, TEvDataStreamsListShardsRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TListShardsActor(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest* request, NActors::TActorId newSchemeCache);
+ TListShardsActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TListShardsActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -1386,7 +1397,6 @@ namespace NKikimr::NDataStreams::V1 {
static constexpr ui32 MIN_MAX_RESULTS = 1;
static constexpr ui32 DEFAULT_MAX_RESULTS = 100;
- TActorId NewSchemeCache;
TString StreamName;
TShardFilter ShardFilter;
TNextToken NextToken;
@@ -1399,17 +1409,16 @@ namespace NKikimr::NDataStreams::V1 {
std::vector<TActorId> Pipes;
};
- TListShardsActor::TListShardsActor(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest* request, NActors::TActorId newSchemeCache)
- : TBase(request, request->GetProtoRequest()->stream_name())
- , NewSchemeCache(std::move(newSchemeCache))
- , StreamName{request->GetProtoRequest()->stream_name()}
- , ShardFilter{request->GetProtoRequest()->shard_filter()}
- , NextToken{request->GetProtoRequest()->next_token()}
+ TListShardsActor::TListShardsActor(NKikimr::NGRpcService::IRequestOpCtx* request)
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
+ , StreamName{GetProtoRequest()->stream_name()}
+ , ShardFilter{GetProtoRequest()->shard_filter()}
+ , NextToken{GetProtoRequest()->next_token()}
, GotOffsetResponds{0}
{
- if (request->GetProtoRequest()->next_token().empty()) {
- StreamName = request->GetProtoRequest()->stream_name();
- MaxResults = request->GetProtoRequest()->max_results();
+ if (GetProtoRequest()->next_token().empty()) {
+ StreamName = GetProtoRequest()->stream_name();
+ MaxResults = GetProtoRequest()->max_results();
NextToken = TNextToken(StreamName, 0, MaxResults, TInstant::Now().MilliSeconds());
} else {
StreamName = NextToken.GetStreamArn();
@@ -1633,9 +1642,10 @@ namespace NKikimr::NDataStreams::V1 {
class TDescribeStreamSummaryActor : public TPQGrpcSchemaBase<TDescribeStreamSummaryActor, TEvDataStreamsDescribeStreamSummaryRequest> {
using TBase = TPQGrpcSchemaBase<TDescribeStreamSummaryActor, TEvDataStreamsDescribeStreamSummaryRequest>;
+ using TProtoRequest = typename TBase::TProtoRequest;
public:
- TDescribeStreamSummaryActor(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest* request);
+ TDescribeStreamSummaryActor(NKikimr::NGRpcService::IRequestOpCtx* request);
~TDescribeStreamSummaryActor() = default;
void Bootstrap(const NActors::TActorContext& ctx);
@@ -1652,9 +1662,9 @@ namespace NKikimr::NDataStreams::V1 {
};
TDescribeStreamSummaryActor::TDescribeStreamSummaryActor(
- NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest* request
+ NKikimr::NGRpcService::IRequestOpCtx* request
)
- : TBase(request, request->GetProtoRequest()->stream_name())
+ : TBase(request, GetRequest<TProtoRequest>(request)->stream_name())
{
}
@@ -1723,7 +1733,7 @@ namespace NKikimr::NDataStreams::V1 {
using TBase = TRpcSchemeRequestActor<TNotImplementedRequestActor, TEvRequest>;
public:
- TNotImplementedRequestActor(TEvRequest* request)
+ TNotImplementedRequestActor(NKikimr::NGRpcService::IRequestOpCtx* request)
: TBase(request)
{
}
@@ -1736,258 +1746,77 @@ namespace NKikimr::NDataStreams::V1 {
this->Die(ctx);
}
};
-
- //-----------------------------------------------------------------------------------
-
- IActor* CreateDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache) {
- return new TDataStreamsService(counters, newSchemeCache);
- }
-
- TDataStreamsService::TDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache)
- : Counters(counters)
- , NewSchemeCache(newSchemeCache)
- {
- }
-
- void TDataStreamsService::Bootstrap(const TActorContext&) {
- Become(&TThis::StateFunc);
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TCreateStreamActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDeleteStreamActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeStreamActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TRegisterStreamConsumerActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDeregisterStreamConsumerActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TPutRecordActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TListStreamsActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TListShardsActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TPutRecordsActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TGetRecordsActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TGetShardIteratorActor(ev->Release().Release(), NewSchemeCache));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TDescribeStreamSummaryActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(ev->Release().Release(), false));
- }
-
- void TDataStreamsService::Handle(TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(ev->Release().Release(), true));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TUpdateShardCountActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TUpdateStreamActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TListStreamConsumersActor(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest>(ev->Release().Release()));
- }
-
- void TDataStreamsService::Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest>(ev->Release().Release()));
- }
-
-}
-
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
+namespace NKikimr::NGRpcService {
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
+using namespace NDataStreams::V1;
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+#define DECLARE_RPC(name) template<> IActor* TEvDataStreams##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \
+ return new T##name##Actor(msg);\
+}\
+void DoDataStreams##name##Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {\
+ TActivationContext::AsActorContext().Register(new T##name##Actor(p.release())); \
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+#define DECLARE_RPC_NI(name) template<> IActor* TEvDataStreams##name##Request::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { \
+ return new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreams##name##Request>(msg);\
+}\
+void DoDataStreams##name##Request(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {\
+ TActivationContext::AsActorContext().Register(new TNotImplementedRequestActor<NKikimr::NGRpcService::TEvDataStreams##name##Request>(p.release()));\
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+DECLARE_RPC(CreateStream);
+DECLARE_RPC(DeleteStream);
+DECLARE_RPC(DescribeStream);
+DECLARE_RPC(PutRecord);
+DECLARE_RPC(RegisterStreamConsumer);
+DECLARE_RPC(DeregisterStreamConsumer);
+DECLARE_RPC_NI(DescribeStreamConsumer);
+DECLARE_RPC(ListStreams);
+DECLARE_RPC(ListShards);
+DECLARE_RPC(PutRecords);
+DECLARE_RPC(GetRecords);
+DECLARE_RPC(GetShardIterator);
+DECLARE_RPC_NI(SubscribeToShard);
+DECLARE_RPC_NI(DescribeLimits);
+DECLARE_RPC(DescribeStreamSummary);
+DECLARE_RPC(UpdateShardCount);
+DECLARE_RPC(ListStreamConsumers);
+DECLARE_RPC_NI(AddTagsToStream);
+DECLARE_RPC_NI(DisableEnhancedMonitoring);
+DECLARE_RPC_NI(EnableEnhancedMonitoring);
+DECLARE_RPC_NI(ListTagsForStream);
+DECLARE_RPC(UpdateStream);
+DECLARE_RPC(SetWriteQuota);
+DECLARE_RPC_NI(MergeShards);
+DECLARE_RPC_NI(RemoveTagsFromStream);
+DECLARE_RPC_NI(SplitShard);
+DECLARE_RPC_NI(StartStreamEncryption);
+DECLARE_RPC_NI(StopStreamEncryption);
+
+
+
+void DoDataStreamsDecreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ auto* req = dynamic_cast<TEvDataStreamsDecreaseStreamRetentionPeriodRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ TActivationContext::AsActorContext().Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(req, false));
}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+template<>
+IActor* TEvDataStreamsDecreaseStreamRetentionPeriodRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ return new TSetStreamRetentionPeriodActor<TEvDataStreamsDecreaseStreamRetentionPeriodRequest>(dynamic_cast<TEvDataStreamsDecreaseStreamRetentionPeriodRequest*>(msg), false);
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+void DoDataStreamsIncreaseStreamRetentionPeriodRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ auto* req = dynamic_cast<TEvDataStreamsIncreaseStreamRetentionPeriodRequest*>(p.release());
+ Y_VERIFY(req != nullptr, "Wrong using of TGRpcRequestWrapper");
+ TActivationContext::AsActorContext().Register(new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(req, true));
}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
+template<>
+IActor* TEvDataStreamsIncreaseStreamRetentionPeriodRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ return new TSetStreamRetentionPeriodActor<TEvDataStreamsIncreaseStreamRetentionPeriodRequest>(dynamic_cast<TEvDataStreamsIncreaseStreamRetentionPeriodRequest*>(msg), true);
}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSetWriteQuotaRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
-}
-void NKikimr::NGRpcService::TGRpcRequestProxy::Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx) {
- ctx.Send(NKikimr::NDataStreams::V1::GetDataStreamsServiceActorID(), ev->Release().Release());
}
diff --git a/ydb/services/datastreams/datastreams_proxy.h b/ydb/services/datastreams/datastreams_proxy.h
index 1712e171ee..72b9e1ae4b 100644
--- a/ydb/services/datastreams/datastreams_proxy.h
+++ b/ydb/services/datastreams/datastreams_proxy.h
@@ -2,97 +2,48 @@
#include "events.h"
+#include <ydb/public/api/grpc/draft/ydb_datastreams_v1.pb.h>
+
#include <ydb/core/client/server/grpc_base.h>
#include <ydb/core/grpc_services/rpc_calls.h>
#include <library/cpp/grpc/server/grpc_server.h>
-
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/actorsystem.h>
-namespace NKikimr::NDataStreams::V1 {
-
- inline TActorId GetDataStreamsServiceActorID() {
- return TActorId(0, "PqDsProxy");
- }
-
- IActor* CreateDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache);
-
- class TDataStreamsService : public NActors::TActorBootstrapped<TDataStreamsService> {
- public:
- TDataStreamsService(TIntrusivePtr<NMonitoring::TDynamicCounters> counters, TActorId newSchemeCache);
-
- void Bootstrap(const TActorContext& ctx);
-
- private:
- STFUNC(StateFunc) {
- switch (ev->GetTypeRewrite()) {
- HFunc(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest, Handle);
- HFunc(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest, Handle);
- }
- }
-
- void Handle(NKikimr::NGRpcService::TEvDataStreamsCreateStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDeleteStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsRegisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDeregisterStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamConsumerRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsListShardsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsGetRecordsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsGetShardIteratorRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsSubscribeToShardRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeLimitsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDescribeStreamSummaryRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDecreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsIncreaseStreamRetentionPeriodRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateShardCountRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsUpdateStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsListStreamConsumersRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsAddTagsToStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsDisableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsEnableEnhancedMonitoringRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsListTagsForStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsMergeShardsRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsRemoveTagsFromStreamRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsSplitShardRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsStartStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx);
- void Handle(NKikimr::NGRpcService::TEvDataStreamsStopStreamEncryptionRequest::TPtr& ev, const TActorContext& ctx);
-
- private:
- TIntrusivePtr<NMonitoring::TDynamicCounters> Counters;
-
- TActorId NewSchemeCache;
- };
+namespace NKikimr {
+namespace NGRpcService {
+
+using TEvDataStreamsCreateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::CreateStreamRequest, Ydb::DataStreams::V1::CreateStreamResponse>;
+using TEvDataStreamsDeleteStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DeleteStreamRequest, Ydb::DataStreams::V1::DeleteStreamResponse>;
+using TEvDataStreamsDescribeStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamRequest, Ydb::DataStreams::V1::DescribeStreamResponse>;
+using TEvDataStreamsRegisterStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::RegisterStreamConsumerRequest, Ydb::DataStreams::V1::RegisterStreamConsumerResponse>;
+using TEvDataStreamsDeregisterStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DeregisterStreamConsumerRequest, Ydb::DataStreams::V1::DeregisterStreamConsumerResponse>;
+using TEvDataStreamsDescribeStreamConsumerRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamConsumerRequest, Ydb::DataStreams::V1::DescribeStreamConsumerResponse>;
+using TEvDataStreamsPutRecordRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::PutRecordRequest, Ydb::DataStreams::V1::PutRecordResponse>;
+using TEvDataStreamsListStreamsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamsRequest, Ydb::DataStreams::V1::ListStreamsResponse>;
+using TEvDataStreamsListShardsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListShardsRequest, Ydb::DataStreams::V1::ListShardsResponse>;
+using TEvDataStreamsPutRecordsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::PutRecordsRequest, Ydb::DataStreams::V1::PutRecordsResponse>;
+using TEvDataStreamsGetRecordsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::GetRecordsRequest, Ydb::DataStreams::V1::GetRecordsResponse>;
+using TEvDataStreamsGetShardIteratorRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::GetShardIteratorRequest, Ydb::DataStreams::V1::GetShardIteratorResponse>;
+using TEvDataStreamsSubscribeToShardRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SubscribeToShardRequest, Ydb::DataStreams::V1::SubscribeToShardResponse>;
+using TEvDataStreamsDescribeLimitsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeLimitsRequest, Ydb::DataStreams::V1::DescribeLimitsResponse>;
+using TEvDataStreamsDescribeStreamSummaryRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DescribeStreamSummaryRequest, Ydb::DataStreams::V1::DescribeStreamSummaryResponse>;
+using TEvDataStreamsDecreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::DecreaseStreamRetentionPeriodResponse>;
+using TEvDataStreamsIncreaseStreamRetentionPeriodRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodRequest, Ydb::DataStreams::V1::IncreaseStreamRetentionPeriodResponse>;
+using TEvDataStreamsUpdateShardCountRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateShardCountRequest, Ydb::DataStreams::V1::UpdateShardCountResponse>;
+using TEvDataStreamsUpdateStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::UpdateStreamRequest, Ydb::DataStreams::V1::UpdateStreamResponse>;
+using TEvDataStreamsSetWriteQuotaRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SetWriteQuotaRequest, Ydb::DataStreams::V1::SetWriteQuotaResponse>;
+using TEvDataStreamsListStreamConsumersRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListStreamConsumersRequest, Ydb::DataStreams::V1::ListStreamConsumersResponse>;
+using TEvDataStreamsAddTagsToStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::AddTagsToStreamRequest, Ydb::DataStreams::V1::AddTagsToStreamResponse>;
+using TEvDataStreamsDisableEnhancedMonitoringRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::DisableEnhancedMonitoringRequest, Ydb::DataStreams::V1::DisableEnhancedMonitoringResponse>;
+using TEvDataStreamsEnableEnhancedMonitoringRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::EnableEnhancedMonitoringRequest, Ydb::DataStreams::V1::EnableEnhancedMonitoringResponse>;
+using TEvDataStreamsListTagsForStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::ListTagsForStreamRequest, Ydb::DataStreams::V1::ListTagsForStreamResponse>;
+using TEvDataStreamsMergeShardsRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::MergeShardsRequest, Ydb::DataStreams::V1::MergeShardsResponse>;
+using TEvDataStreamsRemoveTagsFromStreamRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::RemoveTagsFromStreamRequest, Ydb::DataStreams::V1::RemoveTagsFromStreamResponse>;
+using TEvDataStreamsSplitShardRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::SplitShardRequest, Ydb::DataStreams::V1::SplitShardResponse>;
+using TEvDataStreamsStartStreamEncryptionRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::StartStreamEncryptionRequest, Ydb::DataStreams::V1::StartStreamEncryptionResponse>;
+using TEvDataStreamsStopStreamEncryptionRequest = TGrpcRequestOperationCall<Ydb::DataStreams::V1::StopStreamEncryptionRequest, Ydb::DataStreams::V1::StopStreamEncryptionResponse>;
}
+}
diff --git a/ydb/services/datastreams/grpc_service.cpp b/ydb/services/datastreams/grpc_service.cpp
index 7a18cd4dc6..564796ff94 100644
--- a/ydb/services/datastreams/grpc_service.cpp
+++ b/ydb/services/datastreams/grpc_service.cpp
@@ -1,14 +1,40 @@
#include "grpc_service.h"
-#include "datastreams_proxy.h"
-
+#include <ydb/core/grpc_services/service_datastreams.h>
+#include <ydb/core/base/counters.h>
#include <ydb/core/base/appdata.h>
-
+#include <ydb/core/base/ticket_parser.h>
#include <ydb/core/grpc_services/grpc_helper.h>
-#include <ydb/core/grpc_services/rpc_calls.h>
-
+#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/tx/scheme_board/cache.h>
+namespace {
+
+using namespace NKikimr;
+
+void YdsProcessAttr(const TSchemeBoardEvents::TDescribeSchemeResult& schemeData, NGRpcService::ICheckerIface* checker) {
+ static const std::vector<TString> allowedAttributes = {"folder_id", "service_account_id", "database_id"};
+ //full list of permissions for compatibility. remove old permissions later.
+ static const TVector<TString> permissions = {
+ "ydb.streams.write",
+ "ydb.databases.list",
+ "ydb.databases.create",
+ "ydb.databases.connect"
+ };
+ TVector<std::pair<TString, TString>> attributes;
+ attributes.reserve(schemeData.GetPathDescription().UserAttributesSize());
+ for (const auto& attr : schemeData.GetPathDescription().GetUserAttributes()) {
+ if (std::find(allowedAttributes.begin(), allowedAttributes.end(), attr.GetKey()) != allowedAttributes.end()) {
+ attributes.emplace_back(attr.GetKey(), attr.GetValue());
+ }
+ }
+ if (!attributes.empty()) {
+ checker->SetEntries({{permissions, attributes}});
+ }
+}
+
+}
+
namespace NKikimr::NGRpcService {
TGRpcDataStreamsService::TGRpcDataStreamsService(NActors::TActorSystem *system,
@@ -24,11 +50,6 @@ void TGRpcDataStreamsService::InitService(grpc::ServerCompletionQueue *cq, NGrpc
{
CQ_ = cq;
- InitNewSchemeCache();
- IActor *proxyService = NDataStreams::V1::CreateDataStreamsService(Counters_, NewSchemeCache);
- auto actorId = ActorSystem_->Register(proxyService, TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
- ActorSystem_->RegisterLocalService(NDataStreams::V1::GetDataStreamsServiceActorID(), actorId);
-
SetupIncomingRequests(logger);
}
@@ -45,118 +66,57 @@ void TGRpcDataStreamsService::DecRequest() {
Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
}
-void TGRpcDataStreamsService::InitNewSchemeCache() {
- auto appData = ActorSystem_->AppData<TAppData>();
- auto cacheCounters = GetServiceCounters(Counters_, "pqproxy|schemecache");
- auto cacheConfig = MakeIntrusive<NSchemeCache::TSchemeCacheConfig>(appData, cacheCounters);
- NewSchemeCache = ActorSystem_->Register(CreateSchemeBoardSchemeCache(cacheConfig.Get()),
- TMailboxType::HTSwap, ActorSystem_->AppData<TAppData>()->UserPoolId);
-}
void TGRpcDataStreamsService::SetupIncomingRequests(NGrpc::TLoggerPtr logger)
{
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+ using std::placeholders::_1;
+ using std::placeholders::_2;
+
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::IN, Ydb::DataStreams::V1::OUT, TGRpcDataStreamsService>>(this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("data_streams", #NAME))->Run();
-
- ADD_REQUEST(DescribeStream, DescribeStreamRequest, DescribeStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamRequest(ctx));
- })
- ADD_REQUEST(CreateStream, CreateStreamRequest, CreateStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsCreateStreamRequest(ctx));
- })
- ADD_REQUEST(ListStreams, ListStreamsRequest, ListStreamsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListStreamsRequest(ctx));
- })
- ADD_REQUEST(DeleteStream, DeleteStreamRequest, DeleteStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDeleteStreamRequest(ctx));
- })
- ADD_REQUEST(ListShards, ListShardsRequest, ListShardsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListShardsRequest(ctx));
- })
- ADD_REQUEST(PutRecord, PutRecordRequest, PutRecordResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsPutRecordRequest(ctx));
- })
- ADD_REQUEST(PutRecords, PutRecordsRequest, PutRecordsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsPutRecordsRequest(ctx));
- })
- ADD_REQUEST(GetRecords, GetRecordsRequest, GetRecordsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsGetRecordsRequest(ctx));
- })
- ADD_REQUEST(GetShardIterator, GetShardIteratorRequest, GetShardIteratorResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsGetShardIteratorRequest(ctx));
- })
- ADD_REQUEST(SubscribeToShard, SubscribeToShardRequest, SubscribeToShardResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSubscribeToShardRequest(ctx));
- })
- ADD_REQUEST(DescribeLimits, DescribeLimitsRequest, DescribeLimitsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeLimitsRequest(ctx));
- })
- ADD_REQUEST(DescribeStreamSummary, DescribeStreamSummaryRequest, DescribeStreamSummaryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamSummaryRequest(ctx));
- })
- ADD_REQUEST(DecreaseStreamRetentionPeriod, DecreaseStreamRetentionPeriodRequest, DecreaseStreamRetentionPeriodResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDecreaseStreamRetentionPeriodRequest(ctx));
- })
- ADD_REQUEST(IncreaseStreamRetentionPeriod, IncreaseStreamRetentionPeriodRequest, IncreaseStreamRetentionPeriodResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsIncreaseStreamRetentionPeriodRequest(ctx));
- })
- ADD_REQUEST(UpdateShardCount, UpdateShardCountRequest, UpdateShardCountResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsUpdateShardCountRequest(ctx));
- })
- ADD_REQUEST(RegisterStreamConsumer, RegisterStreamConsumerRequest, RegisterStreamConsumerResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsRegisterStreamConsumerRequest(ctx));
- })
- ADD_REQUEST(DeregisterStreamConsumer, DeregisterStreamConsumerRequest, DeregisterStreamConsumerResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDeregisterStreamConsumerRequest(ctx));
- })
- ADD_REQUEST(DescribeStreamConsumer, DescribeStreamConsumerRequest, DescribeStreamConsumerResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDescribeStreamConsumerRequest(ctx));
- })
- ADD_REQUEST(ListStreamConsumers, ListStreamConsumersRequest, ListStreamConsumersResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListStreamConsumersRequest(ctx));
- })
- ADD_REQUEST(AddTagsToStream, AddTagsToStreamRequest, AddTagsToStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsAddTagsToStreamRequest(ctx));
- })
- ADD_REQUEST(DisableEnhancedMonitoring, DisableEnhancedMonitoringRequest, DisableEnhancedMonitoringResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsDisableEnhancedMonitoringRequest(ctx));
- })
- ADD_REQUEST(EnableEnhancedMonitoring, EnableEnhancedMonitoringRequest, EnableEnhancedMonitoringResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsEnableEnhancedMonitoringRequest(ctx));
- })
- ADD_REQUEST(ListTagsForStream, ListTagsForStreamRequest, ListTagsForStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsListTagsForStreamRequest(ctx));
- })
- ADD_REQUEST(MergeShards, MergeShardsRequest, MergeShardsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsMergeShardsRequest(ctx));
- })
- ADD_REQUEST(RemoveTagsFromStream, RemoveTagsFromStreamRequest, RemoveTagsFromStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsRemoveTagsFromStreamRequest(ctx));
- })
- ADD_REQUEST(SplitShard, SplitShardRequest, SplitShardResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSplitShardRequest(ctx));
- })
- ADD_REQUEST(StartStreamEncryption, StartStreamEncryptionRequest, StartStreamEncryptionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsStartStreamEncryptionRequest(ctx));
- })
- ADD_REQUEST(StopStreamEncryption, StopStreamEncryptionRequest, StopStreamEncryptionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsStopStreamEncryptionRequest(ctx));
- })
- ADD_REQUEST(UpdateStream, UpdateStreamRequest, UpdateStreamResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsUpdateStreamRequest(ctx));
- })
- ADD_REQUEST(SetWriteQuota, SetWriteQuotaRequest, SetWriteQuotaResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvDataStreamsSetWriteQuotaRequest(ctx));
- })
+#define ADD_REQUEST(NAME, CB, ATTR) \
+ MakeIntrusive<TGRpcRequest<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response, TGRpcDataStreamsService>> \
+ (this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Ydb::DataStreams::V1::NAME##Request, Ydb::DataStreams::V1::NAME##Response> \
+ (ctx, CB, TRequestAuxSettings{TRateLimiterMode::Off, ATTR})); \
+ }, &Ydb::DataStreams::V1::DataStreamsService::AsyncService::Request ## NAME, \
+ #NAME, logger, getCounterBlock("data_streams", #NAME))->Run();
+
+ ADD_REQUEST(DescribeStream, DoDataStreamsDescribeStreamRequest, nullptr)
+ ADD_REQUEST(CreateStream, DoDataStreamsCreateStreamRequest, nullptr)
+ ADD_REQUEST(ListStreams, DoDataStreamsListStreamsRequest, nullptr)
+ ADD_REQUEST(DeleteStream, DoDataStreamsDeleteStreamRequest, nullptr)
+ ADD_REQUEST(ListShards, DoDataStreamsListShardsRequest, nullptr)
+ ADD_REQUEST(PutRecord, DoDataStreamsPutRecordRequest, YdsProcessAttr)
+ ADD_REQUEST(PutRecords, DoDataStreamsPutRecordsRequest, YdsProcessAttr)
+ ADD_REQUEST(GetRecords, DoDataStreamsGetRecordsRequest, nullptr)
+ ADD_REQUEST(GetShardIterator, DoDataStreamsGetShardIteratorRequest, nullptr)
+ ADD_REQUEST(SubscribeToShard, DoDataStreamsSubscribeToShardRequest, nullptr)
+ ADD_REQUEST(DescribeLimits, DoDataStreamsDescribeLimitsRequest, nullptr)
+ ADD_REQUEST(DescribeStreamSummary, DoDataStreamsDescribeStreamSummaryRequest, nullptr)
+ ADD_REQUEST(DecreaseStreamRetentionPeriod, DoDataStreamsDecreaseStreamRetentionPeriodRequest, nullptr)
+ ADD_REQUEST(IncreaseStreamRetentionPeriod, DoDataStreamsIncreaseStreamRetentionPeriodRequest, nullptr)
+ ADD_REQUEST(UpdateShardCount, DoDataStreamsUpdateShardCountRequest, nullptr)
+ ADD_REQUEST(RegisterStreamConsumer, DoDataStreamsRegisterStreamConsumerRequest, nullptr)
+ ADD_REQUEST(DeregisterStreamConsumer, DoDataStreamsDeregisterStreamConsumerRequest, nullptr)
+ ADD_REQUEST(DescribeStreamConsumer, DoDataStreamsDescribeStreamConsumerRequest, nullptr)
+ ADD_REQUEST(ListStreamConsumers, DoDataStreamsListStreamConsumersRequest, nullptr)
+ ADD_REQUEST(AddTagsToStream, DoDataStreamsAddTagsToStreamRequest, nullptr)
+ ADD_REQUEST(DisableEnhancedMonitoring, DoDataStreamsDisableEnhancedMonitoringRequest, nullptr)
+ ADD_REQUEST(EnableEnhancedMonitoring, DoDataStreamsEnableEnhancedMonitoringRequest, nullptr)
+ ADD_REQUEST(ListTagsForStream, DoDataStreamsListTagsForStreamRequest, nullptr)
+ ADD_REQUEST(MergeShards, DoDataStreamsMergeShardsRequest, nullptr)
+ ADD_REQUEST(RemoveTagsFromStream, DoDataStreamsRemoveTagsFromStreamRequest, nullptr)
+ ADD_REQUEST(SplitShard, DoDataStreamsSplitShardRequest, nullptr)
+ ADD_REQUEST(StartStreamEncryption, DoDataStreamsStartStreamEncryptionRequest, nullptr)
+ ADD_REQUEST(StopStreamEncryption, DoDataStreamsStopStreamEncryptionRequest, nullptr)
+ ADD_REQUEST(UpdateStream, DoDataStreamsUpdateStreamRequest, nullptr)
+ ADD_REQUEST(SetWriteQuota, DoDataStreamsSetWriteQuotaRequest, nullptr)
#undef ADD_REQUEST
}
diff --git a/ydb/services/datastreams/grpc_service.h b/ydb/services/datastreams/grpc_service.h
index d8ed51df52..044aacb8a1 100644
--- a/ydb/services/datastreams/grpc_service.h
+++ b/ydb/services/datastreams/grpc_service.h
@@ -21,14 +21,12 @@ namespace NKikimr::NGRpcService {
private:
void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
- void InitNewSchemeCache();
NActors::TActorSystem* ActorSystem_;
grpc::ServerCompletionQueue* CQ_ = nullptr;
TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
NActors::TActorId GRpcRequestProxyId_;
- NActors::TActorId NewSchemeCache;
NGrpc::TGlobalLimiter* Limiter_ = nullptr;
};
diff --git a/ydb/services/datastreams/put_records_actor.h b/ydb/services/datastreams/put_records_actor.h
index ff2e2beb86..8923dce1c6 100644
--- a/ydb/services/datastreams/put_records_actor.h
+++ b/ydb/services/datastreams/put_records_actor.h
@@ -1,8 +1,8 @@
#pragma once
+#include "datastreams_proxy.h"
#include "events.h"
-#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/persqueue/events/global.h>
#include <ydb/core/persqueue/write_meta.h>
#include <ydb/core/protos/msgbus_pq.pb.h>
@@ -17,8 +17,6 @@
namespace NKikimr::NDataStreams::V1 {
-
-
struct TPutRecordsItem {
TString Data;
TString Key;
@@ -218,7 +216,7 @@ namespace NKikimr::NDataStreams::V1 {
using TBase = NGRpcProxy::V1::TPQGrpcSchemaBase<TPutRecordsActorBase<TDerived, TProto>, TProto>;
public:
- TPutRecordsActorBase(TProto* request, NActors::TActorId newSchemeCache);
+ TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request);
~TPutRecordsActorBase() = default;
void Bootstrap(const NActors::TActorContext &ctx);
@@ -235,7 +233,6 @@ namespace NKikimr::NDataStreams::V1 {
};
THashMap<ui32, TPartitionTask> PartitionToActor;
- NActors::TActorId NewSchemeCache;
Ydb::DataStreams::V1::PutRecordsResult PutRecordsResult;
TString Ip;
@@ -256,9 +253,8 @@ namespace NKikimr::NDataStreams::V1 {
};
template<class TDerived, class TProto>
- TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(TProto* request, NActors::TActorId newSchemeCache)
- : TBase(request, request->GetProtoRequest()->stream_name())
- , NewSchemeCache(std::move(newSchemeCache))
+ TPutRecordsActorBase<TDerived, TProto>::TPutRecordsActorBase(NGRpcService::IRequestOpCtx* request)
+ : TBase(request, dynamic_cast<const typename TProto::TRequest*>(request->GetRequest())->stream_name())
, Ip(request->GetPeerName())
{
Y_ENSURE(request);
@@ -302,7 +298,7 @@ namespace NKikimr::NDataStreams::V1 {
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
entry.SyncVersion = true;
schemeCacheRequest->ResultSet.emplace_back(entry);
- ctx.Send(NewSchemeCache, MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
+ ctx.Send(MakeSchemeCacheID(), MakeHolder<TEvTxProxySchemeCache::TEvNavigateKeySet>(schemeCacheRequest.release()));
}
template<class TDerived, class TProto>
@@ -402,8 +398,8 @@ namespace NKikimr::NDataStreams::V1 {
public:
using TBase = TPutRecordsActorBase<TPutRecordsActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest>;
- TPutRecordsActor(NKikimr::NGRpcService::TEvDataStreamsPutRecordsRequest* request, TActorId newSchemeCache)
- : TBase(request, newSchemeCache)
+ TPutRecordsActor(NGRpcService::IRequestOpCtx* request)
+ : TBase(request)
{}
const Ydb::DataStreams::V1::PutRecordsRequest& GetPutRecordsRequest() const;
@@ -424,15 +420,15 @@ namespace NKikimr::NDataStreams::V1 {
public:
using TBase = TPutRecordsActorBase<TPutRecordActor, NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest>;
- TPutRecordActor(NKikimr::NGRpcService::TEvDataStreamsPutRecordRequest* request, TActorId newSchemeCache)
- : TBase(request, newSchemeCache)
+ TPutRecordActor(NGRpcService::IRequestOpCtx* request)
+ : TBase(request)
{
- PutRecordsRequest.set_stream_name(request->GetProtoRequest()->stream_name());
+ PutRecordsRequest.set_stream_name(GetProtoRequest()->stream_name());
auto& record = *PutRecordsRequest.add_records();
- record.set_data(request->GetProtoRequest()->data());
- record.set_explicit_hash_key(request->GetProtoRequest()->explicit_hash_key());
- record.set_partition_key(request->GetProtoRequest()->partition_key());
+ record.set_data(GetProtoRequest()->data());
+ record.set_explicit_hash_key(GetProtoRequest()->explicit_hash_key());
+ record.set_partition_key(GetProtoRequest()->partition_key());
}
const Ydb::DataStreams::V1::PutRecordsRequest& GetPutRecordsRequest() const;
diff --git a/ydb/services/lib/actors/pq_schema_actor.h b/ydb/services/lib/actors/pq_schema_actor.h
index 74e961793e..b9294498b2 100644
--- a/ydb/services/lib/actors/pq_schema_actor.h
+++ b/ydb/services/lib/actors/pq_schema_actor.h
@@ -66,8 +66,11 @@ namespace NKikimr::NGRpcProxy::V1 {
protected:
using TBase = NKikimr::NGRpcService::TRpcSchemeRequestActor<TDerived, TRequest>;
+ using TProtoRequest = typename TRequest::TRequest;
+
public:
- TPQGrpcSchemaBase(TRequest *request, const TString& topicPath)
+
+ TPQGrpcSchemaBase(NGRpcService::IRequestOpCtx *request, const TString& topicPath)
: TBase(request)
, TopicPath(topicPath)
{
@@ -286,7 +289,7 @@ namespace NKikimr::NGRpcProxy::V1 {
using TBase = TPQGrpcSchemaBase<TDerived, TRequest>;
public:
- TUpdateSchemeActor(TRequest* request, const TString& topicPath)
+ TUpdateSchemeActor(NGRpcService::IRequestOpCtx* request, const TString& topicPath)
: TBase(request, topicPath)
{}
~TUpdateSchemeActor() = default;
diff --git a/ydb/services/yq/grpc_service.cpp b/ydb/services/yq/grpc_service.cpp
index c7c0b6b0ae..bb71dde092 100644
--- a/ydb/services/yq/grpc_service.cpp
+++ b/ydb/services/yq/grpc_service.cpp
@@ -3,6 +3,7 @@
#include <ydb/core/grpc_services/grpc_helper.h>
#include <ydb/core/grpc_services/grpc_request_proxy.h>
#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/service_yq.h>
#include <ydb/library/protobuf_printer/security_printer.h>
namespace NKikimr::NGRpcService {
@@ -34,102 +35,154 @@ void TGRpcYandexQueryService::DecRequest() {
void TGRpcYandexQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+ static const TVector<TString> CreateQueryPermissions = {
+ "yq.queries.create",
+ "yq.queries.invoke",
+ "yq.connections.use",
+ "yq.bindings.use",
+ "yq.resources.managePublic"
+ };
+ static const TVector<TString> ListQueriesPermissions = {
+ "yq.queries.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> DescribeQueryPermissions = {
+ "yq.queries.get",
+ "yq.queries.viewAst",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> GetQueryStatusPermissions = {
+ "yq.queries.getStatus",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> ModifyQueryPermissions = {
+ "yq.queries.update",
+ "yq.queries.invoke",
+ "yq.connections.use",
+ "yq.bindings.use",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+ static const TVector<TString> DeleteQueryPermissions = {
+ "yq.queries.delete",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+ static const TVector<TString> ControlQueryPermissions = {
+ "yq.queries.control",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+ static const TVector<TString> GetResultDataPermissions = {
+ "yq.queries.getData",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> ListJobsPermissions = {
+ "yq.jobs.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> DescribeJobPermissions = {
+ "yq.jobs.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> CreateConnectionPermissions = {
+ "yq.connections.create",
+ "yq.resources.managePublic",
+ };
+ static const TVector<TString> ListConnectionsPermissions = {
+ "yq.connections.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> DescribeConnectionPermissions = {
+ "yq.connections.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> ModifyConnectionPermissions = {
+ "yq.connections.update",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate",
+ };
+ static const TVector<TString> DeleteConnectionPermissions = {
+ "yq.connections.delete",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+ static const TVector<TString> TestConnectionPermissions = {
+ "yq.connections.create",
+ };
+ static const TVector<TString> CreateBindingPermissions = {
+ "yq.bindings.create",
+ "yq.resources.managePublic"
+ };
+ static const TVector<TString> ListBindingsPermissions = {
+ "yq.bindings.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> DescribeBindingPermissions = {
+ "yq.bindings.get",
+ "yq.resources.viewPublic",
+ "yq.resources.viewPrivate"
+ };
+ static const TVector<TString> ModifyBindingPermissions = {
+ "yq.bindings.update",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+ static const TVector<TString> DeleteBindingPermissions = {
+ "yq.bindings.delete",
+ "yq.resources.managePublic",
+ "yq.resources.managePrivate"
+ };
+
#ifdef ADD_REQUEST
#error ADD_REQUEST macro already defined
#endif
-#define ADD_REQUEST(NAME, IN, OUT, ACTION) \
- MakeIntrusive<TGRpcRequest<YandexQuery::IN, YandexQuery::OUT, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::IN>, TSecurityTextFormatPrinter<YandexQuery::OUT>>>(this, &Service_, CQ_, \
- [this](NGrpc::IRequestContextBase *ctx) { \
- NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
- ACTION; \
- }, &YandexQuery::V1::YandexQueryService::AsyncService::Request ## NAME, \
- #NAME, logger, getCounterBlock("yq", #NAME))->Run();
-
- ADD_REQUEST(CreateQuery, CreateQueryRequest, CreateQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateQueryRequest(ctx));
- })
-
- ADD_REQUEST(ListQueries, ListQueriesRequest, ListQueriesResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListQueriesRequest(ctx));
- })
-
- ADD_REQUEST(DescribeQuery, DescribeQueryRequest, DescribeQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeQueryRequest(ctx));
- })
-
- ADD_REQUEST(GetQueryStatus, GetQueryStatusRequest, GetQueryStatusResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryGetQueryStatusRequest(ctx));
- })
-
- ADD_REQUEST(ModifyQuery, ModifyQueryRequest, ModifyQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyQueryRequest(ctx));
- })
-
- ADD_REQUEST(DeleteQuery, DeleteQueryRequest, DeleteQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteQueryRequest(ctx));
- })
-
- ADD_REQUEST(ControlQuery, ControlQueryRequest, ControlQueryResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryControlQueryRequest(ctx));
- })
-
- ADD_REQUEST(GetResultData, GetResultDataRequest, GetResultDataResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryGetResultDataRequest(ctx));
- })
-
- ADD_REQUEST(ListJobs, ListJobsRequest, ListJobsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListJobsRequest(ctx));
- })
-
- ADD_REQUEST(DescribeJob, DescribeJobRequest, DescribeJobResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeJobRequest(ctx));
- })
-
- ADD_REQUEST(CreateConnection, CreateConnectionRequest, CreateConnectionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateConnectionRequest(ctx));
- })
-
- ADD_REQUEST(ListConnections, ListConnectionsRequest, ListConnectionsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListConnectionsRequest(ctx));
- })
-
- ADD_REQUEST(DescribeConnection, DescribeConnectionRequest, DescribeConnectionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeConnectionRequest(ctx));
- })
-
- ADD_REQUEST(ModifyConnection, ModifyConnectionRequest, ModifyConnectionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyConnectionRequest(ctx));
- })
-
- ADD_REQUEST(DeleteConnection, DeleteConnectionRequest, DeleteConnectionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteConnectionRequest(ctx));
- })
-
- ADD_REQUEST(TestConnection, TestConnectionRequest, TestConnectionResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryTestConnectionRequest(ctx));
- })
-
- ADD_REQUEST(CreateBinding, CreateBindingRequest, CreateBindingResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryCreateBindingRequest(ctx));
- })
-
- ADD_REQUEST(ListBindings, ListBindingsRequest, ListBindingsResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryListBindingsRequest(ctx));
- })
-
- ADD_REQUEST(DescribeBinding, DescribeBindingRequest, DescribeBindingResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDescribeBindingRequest(ctx));
- })
-
- ADD_REQUEST(ModifyBinding, ModifyBindingRequest, ModifyBindingResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryModifyBindingRequest(ctx));
- })
-
- ADD_REQUEST(DeleteBinding, DeleteBindingRequest, DeleteBindingResponse, {
- ActorSystem_->Send(GRpcRequestProxyId_, new TEvYandexQueryDeleteBindingRequest(ctx));
- })
+#define ADD_REQUEST(NAME, CB, PERMISSIONS) \
+MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Response, TGRpcYandexQueryService, TSecurityTextFormatPrinter<YandexQuery::NAME##Request>, TSecurityTextFormatPrinter<YandexQuery::NAME##Response>>>( \
+ this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcYqRequestOperationCall<YandexQuery::NAME##Request, YandexQuery::NAME##Response> \
+ (ctx, &CB, PERMISSIONS)); \
+ }, \
+ &YandexQuery::V1::YandexQueryService::AsyncService::Request##NAME, \
+ #NAME, logger, getCounterBlock("yq", #NAME)) \
+ ->Run(); \
+
+ ADD_REQUEST(CreateQuery, DoYandexQueryCreateQueryRequest, CreateQueryPermissions)
+ ADD_REQUEST(ListQueries, DoYandexQueryListQueriesRequest, ListQueriesPermissions)
+ ADD_REQUEST(DescribeQuery, DoYandexQueryDescribeQueryRequest, DescribeQueryPermissions)
+ ADD_REQUEST(GetQueryStatus, DoYandexQueryGetQueryStatusRequest, GetQueryStatusPermissions)
+ ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions)
+ ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions)
+ ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions)
+ ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions)
+ ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions)
+ ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions)
+ ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions)
+ ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions)
+ ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions)
+ ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions)
+ ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions)
+ ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions)
+ ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions)
+ ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions)
+ ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions)
+ ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions)
+ ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions)
#undef ADD_REQUEST
+
}
} // namespace NKikimr::NGRpcService