diff options
author | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-07-18 21:13:36 +0300 |
---|---|---|
committer | d-mokhnatkin <d-mokhnatkin@ydb.tech> | 2022-07-18 21:13:36 +0300 |
commit | 3041f85f9fa874cd45bf94d033fc4a95b3faf37f (patch) | |
tree | bab750c2de93eb51761e453869bdf6d25fe243a1 | |
parent | 0a3c71dcf8547728a97969eb913292de315aebb9 (diff) | |
download | ydb-3041f85f9fa874cd45bf94d033fc4a95b3faf37f.tar.gz |
add fq service
45 files changed, 6172 insertions, 144 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index d6157ca7859..660a32092f2 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -911,6 +911,7 @@ add_subdirectory(ydb/core/yq/libs/logs) add_subdirectory(ydb/services/auth) add_subdirectory(ydb/services/cms) add_subdirectory(ydb/services/discovery) +add_subdirectory(ydb/services/fq) add_subdirectory(ydb/services/kesus) add_subdirectory(ydb/services/local_discovery) add_subdirectory(ydb/services/monitoring) @@ -1081,9 +1082,11 @@ add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/library/protobuf_printer/ut) add_subdirectory(ydb/library/schlab/ut) add_subdirectory(ydb/library/security/ut) +add_subdirectory(ydb/public/lib/fq) add_subdirectory(ydb/public/lib/idx_test) add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) +add_subdirectory(ydb/services/fq/ut_integration) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index faad06f7475..e28e5cc6605 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -934,6 +934,7 @@ add_subdirectory(ydb/core/yq/libs/logs) add_subdirectory(ydb/services/auth) add_subdirectory(ydb/services/cms) add_subdirectory(ydb/services/discovery) +add_subdirectory(ydb/services/fq) add_subdirectory(ydb/services/kesus) add_subdirectory(ydb/services/local_discovery) add_subdirectory(ydb/services/monitoring) @@ -1102,9 +1103,11 @@ add_subdirectory(ydb/library/pretty_types_print/wilson) add_subdirectory(ydb/library/protobuf_printer/ut) add_subdirectory(ydb/library/schlab/ut) add_subdirectory(ydb/library/security/ut) +add_subdirectory(ydb/public/lib/fq) add_subdirectory(ydb/public/lib/idx_test) add_subdirectory(ydb/services/cms/ut) add_subdirectory(ydb/services/datastreams/ut) +add_subdirectory(ydb/services/fq/ut_integration) add_subdirectory(ydb/services/persqueue_cluster_discovery/ut) add_subdirectory(ydb/services/persqueue_v1/ut) add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic) diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt index d2a0b6d61d7..66c53f7ff6a 100644 --- a/ydb/core/driver_lib/run/CMakeLists.txt +++ b/ydb/core/driver_lib/run/CMakeLists.txt @@ -117,6 +117,7 @@ target_link_libraries(run PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + ydb-services-fq ydb-services-kesus ydb-services-local_discovery ydb-services-monitoring diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index f9cf13dc1ef..c063e630c05 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -78,31 +78,33 @@ #include <library/cpp/grpc/server/actors/logger.h> -#include <ydb/services/yq/private_grpc.h> +#include <ydb/services/auth/grpc_service.h> #include <ydb/services/cms/grpc_service.h> #include <ydb/services/datastreams/grpc_service.h> +#include <ydb/services/discovery/grpc_service.h> +#include <ydb/services/fq/grpc_service.h> +#include <ydb/services/fq/private_grpc.h> #include <ydb/services/kesus/grpc_service.h> +#include <ydb/services/local_discovery/grpc_service.h> #include <ydb/services/monitoring/grpc_service.h> -#include <ydb/services/auth/grpc_service.h> +#include <ydb/services/persqueue_cluster_discovery/grpc_service.h> +#include <ydb/services/persqueue_v1/persqueue.h> +#include <ydb/services/persqueue_v1/topic.h> +#include <ydb/services/rate_limiter/grpc_service.h> #include <ydb/services/ydb/ydb_clickhouse_internal.h> #include <ydb/services/ydb/ydb_dummy.h> #include <ydb/services/ydb/ydb_experimental.h> #include <ydb/services/ydb/ydb_export.h> #include <ydb/services/ydb/ydb_import.h> +#include <ydb/services/ydb/ydb_logstore.h> +#include <ydb/services/ydb/ydb_long_tx.h> #include <ydb/services/ydb/ydb_operation.h> #include <ydb/services/ydb/ydb_s3_internal.h> #include <ydb/services/ydb/ydb_scheme.h> #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> -#include <ydb/services/ydb/ydb_long_tx.h> -#include <ydb/services/ydb/ydb_logstore.h> -#include <ydb/services/persqueue_cluster_discovery/grpc_service.h> -#include <ydb/services/persqueue_v1/persqueue.h> -#include <ydb/services/persqueue_v1/topic.h> -#include <ydb/services/rate_limiter/grpc_service.h> -#include <ydb/services/discovery/grpc_service.h> -#include <ydb/services/local_discovery/grpc_service.h> #include <ydb/services/yq/grpc_service.h> +#include <ydb/services/yq/private_grpc.h> #include <ydb/core/yq/libs/init/init.h> @@ -737,10 +739,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasYandexQuery) { server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); // TODO: REMOVE next line after migration to "yq_private" server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) { server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); + server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } if (hasLogStore) { diff --git a/ydb/core/grpc_services/CMakeLists.txt b/ydb/core/grpc_services/CMakeLists.txt index 658e23468b2..90ffceb87cc 100644 --- a/ydb/core/grpc_services/CMakeLists.txt +++ b/ydb/core/grpc_services/CMakeLists.txt @@ -85,6 +85,8 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_data_query.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_forget_operation.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_fq_internal.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_fq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_get_operation.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_get_shard_locations.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp new file mode 100644 index 00000000000..32fff6eecc4 --- /dev/null +++ b/ydb/core/grpc_services/rpc_fq.cpp @@ -0,0 +1,444 @@ +#include "rpc_common.h" +#include "rpc_deferrable.h" + +#include <ydb/core/grpc_services/service_fq.h> +#include <ydb/core/yq/libs/audit/events/events.h> +#include <ydb/core/yq/libs/audit/yq_audit_service.h> +#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h> +#include <ydb/core/yq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/yq/libs/control_plane_proxy/utils.h> +#include <ydb/public/api/protos/fq.pb.h> + +#include <ydb/library/aclib/aclib.h> + +#include <library/cpp/actors/core/hfunc.h> + +#include <util/generic/guid.h> +#include <util/string/split.h> + +namespace NKikimr { +namespace NGRpcService { + +using namespace Ydb; + +template <typename RpcRequestType, typename EvRequestType, typename EvResponseType, typename CastRequest, typename CastResult> +class TFederatedQueryRequestRPC : public TRpcOperationRequestActor< + TFederatedQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType, CastRequest, CastResult>, RpcRequestType> { + +public: + using TBase = TRpcOperationRequestActor< + TFederatedQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, + RpcRequestType>; + using TBase::Become; + using TBase::Send; + using TBase::PassAway; + using TBase::Request_; + using TBase::GetProtoRequest; + +protected: + TString Token; + TString FolderId; + TString User; + TString PeerName; + TString UserAgent; + TString RequestId; + +public: + TFederatedQueryRequestRPC(IRequestOpCtx* request) + : TBase(request) {} + + void Bootstrap() { + auto requestCtx = Request_.get(); + + auto request = dynamic_cast<RpcRequestType*>(requestCtx); + Y_VERIFY(request); + + auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx); + Y_VERIFY(proxyCtx); + + PeerName = Request_->GetPeerName(); + UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty"); + RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString()); + + TMaybe<TString> authToken = proxyCtx->GetYdbToken(); + if (!authToken) { + ReplyWithStatus("Token is empty", StatusIds::BAD_REQUEST); + return; + } + Token = *authToken; + + TString ydbProject = Request_->GetPeerMetaValues("x-ydb-fq-project").GetOrElse(""); + + if (!ydbProject.StartsWith("yandexcloud://")) { + ReplyWithStatus("x-ydb-fq-project should start with yandexcloud:// but got " + ydbProject, StatusIds::BAD_REQUEST); + return; + } + + const TVector<TString> path = StringSplitter(ydbProject).Split('/').SkipEmpty(); + if (path.size() != 2 && path.size() != 3) { + ReplyWithStatus("x-ydb-fq-project format is invalid. Must be yandexcloud://folder_id, but got " + ydbProject, StatusIds::BAD_REQUEST); + return; + } + + FolderId = path.back(); + if (!FolderId) { + ReplyWithStatus("Folder id is empty", StatusIds::BAD_REQUEST); + return; + } + + if (FolderId.length() > 1024) { + ReplyWithStatus("Folder id length greater than 1024 characters: " + FolderId, StatusIds::BAD_REQUEST); + return; + } + + const TString& internalToken = proxyCtx->GetInternalToken(); + TVector<TString> permissions; + if (internalToken) { + NACLib::TUserToken userToken(internalToken); + User = userToken.GetUserSID(); + for (const auto& sid: request->Sids) { + if (userToken.IsExist(sid)) { + permissions.push_back(sid); + } + } + } + + if (!User) { + ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED); + return; + } + + const auto* req = GetProtoRequest(); + TProtoStringType protoString; + if (!req->SerializeToString(&protoString)) { + ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST); + return; + } + auto castedRequest = CastRequest(); + if (!castedRequest.ParseFromString(protoString)) { + ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST); + return; + } + auto ev = MakeHolder<EvRequestType>(FolderId, castedRequest, User, Token, permissions); + Send(NYq::ControlPlaneProxyActorId(), ev.Release()); + Become(&TFederatedQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType, CastRequest, CastResult>::StateFunc); + } + +protected: + void ReplyWithStatus(const TString& issueMessage, StatusIds::StatusCode status) { + Request_->RaiseIssue(NYql::TIssue(issueMessage)); + Request_->ReplyWithYdbStatus(status); + PassAway(); + } + + STRICT_STFUNC(StateFunc, + hFunc(EvResponseType, Handle); + ) + + template <typename TResponse, typename TReq> + void SendResponse(const TResponse& response, TReq& req) { + if (response.Issues) { + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); + } else { + TProtoStringType protoString; + if (!response.Result.SerializeToString(&protoString)) { + ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST); + return; + } + auto castedResult = CastResult(); + if (!castedResult.ParseFromString(protoString)) { + ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST); + return; + } + + req.SendResult(castedResult, StatusIds::SUCCESS); + } + } + + template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; } + void SendResponse(const TResponse& response, TReq& req) { + if (response.Issues) { + req.RaiseIssues(response.Issues); + req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST); + } else { + TProtoStringType protoString; + if (!response.Result.SerializeToString(&protoString)) { + ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST); + return; + } + auto castedResponse = CastResult(); + if (!castedResponse.ParseFromString(protoString)) { + ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST); + return; + } + req.SendResult(castedResponse, StatusIds::SUCCESS); + } + + NYq::TEvAuditService::TExtraInfo extraInfo{ + .Token = Token, + .FolderId = FolderId, + .User = User, + .PeerName = PeerName, + .UserAgent = UserAgent, + .RequestId = RequestId, + }; + + const auto* protoReq = GetProtoRequest(); + TProtoStringType protoString; + if (!protoReq->SerializeToString(&protoString)) { + ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST); + return; + } + auto castedProtoRequest = CastRequest(); + if (!castedProtoRequest.ParseFromString(protoString)) { + ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST); + return; + } + + Send(NYq::YqAuditServiceActorId(), NYq::TEvAuditService::MakeAuditEvent( + std::move(extraInfo), + castedProtoRequest, + response.Issues, + response.AuditDetails)); + } + + void Handle(typename EvResponseType::TPtr& ev) { + SendResponse(*ev->Get(), *Request_); + PassAway(); + } +}; + +using TFederatedQueryCreateQueryRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::CreateQueryRequest, FederatedQuery::CreateQueryResponse>, + NYq::TEvControlPlaneProxy::TEvCreateQueryRequest, + NYq::TEvControlPlaneProxy::TEvCreateQueryResponse, + YandexQuery::CreateQueryRequest, + FederatedQuery::CreateQueryResult>; + +void DoFederatedQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryCreateQueryRPC(p.release())); +} + +using TFederatedQueryListQueriesRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ListQueriesRequest, FederatedQuery::ListQueriesResponse>, + NYq::TEvControlPlaneProxy::TEvListQueriesRequest, + NYq::TEvControlPlaneProxy::TEvListQueriesResponse, + YandexQuery::ListQueriesRequest, + FederatedQuery::ListQueriesResult>; + +void DoFederatedQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryListQueriesRPC(p.release())); +} + +using TFederatedQueryDescribeQueryRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DescribeQueryRequest, FederatedQuery::DescribeQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest, + NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse, + YandexQuery::DescribeQueryRequest, + FederatedQuery::DescribeQueryResult>; + +void DoFederatedQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeQueryRPC(p.release())); +} + +using TFederatedQueryGetQueryStatusRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::GetQueryStatusRequest, FederatedQuery::GetQueryStatusResponse>, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest, + NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse, + YandexQuery::GetQueryStatusRequest, + FederatedQuery::GetQueryStatusResult>; + +void DoFederatedQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryGetQueryStatusRPC(p.release())); +} + +using TFederatedQueryModifyQueryRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ModifyQueryRequest, FederatedQuery::ModifyQueryResponse>, + NYq::TEvControlPlaneProxy::TEvModifyQueryRequest, + NYq::TEvControlPlaneProxy::TEvModifyQueryResponse, + YandexQuery::ModifyQueryRequest, + FederatedQuery::ModifyQueryResult>; + +void DoFederatedQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryModifyQueryRPC(p.release())); +} + +using TFederatedQueryDeleteQueryRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DeleteQueryRequest, FederatedQuery::DeleteQueryResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest, + NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse, + YandexQuery::DeleteQueryRequest, + FederatedQuery::DeleteQueryResult>; + +void DoFederatedQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteQueryRPC(p.release())); +} + +using TFederatedQueryControlQueryRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ControlQueryRequest, FederatedQuery::ControlQueryResponse>, + NYq::TEvControlPlaneProxy::TEvControlQueryRequest, + NYq::TEvControlPlaneProxy::TEvControlQueryResponse, + YandexQuery::ControlQueryRequest, + FederatedQuery::ControlQueryResult>; + +void DoFederatedQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryControlQueryRPC(p.release())); +} + +using TFederatedQueryGetResultDataRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::GetResultDataRequest, FederatedQuery::GetResultDataResponse>, + NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, + NYq::TEvControlPlaneProxy::TEvGetResultDataResponse, + YandexQuery::GetResultDataRequest, + FederatedQuery::GetResultDataResult>; + +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryGetResultDataRPC(p.release())); +} + +using TFederatedQueryListJobsRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ListJobsRequest, FederatedQuery::ListJobsResponse>, + NYq::TEvControlPlaneProxy::TEvListJobsRequest, + NYq::TEvControlPlaneProxy::TEvListJobsResponse, + YandexQuery::ListJobsRequest, + FederatedQuery::ListJobsResult>; + +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryListJobsRPC(p.release())); +} + +using TFederatedQueryDescribeJobRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DescribeJobRequest, FederatedQuery::DescribeJobResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, + NYq::TEvControlPlaneProxy::TEvDescribeJobResponse, + YandexQuery::DescribeJobRequest, + FederatedQuery::DescribeJobResult>; + +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeJobRPC(p.release())); +} + +using TFederatedQueryCreateConnectionRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::CreateConnectionRequest, FederatedQuery::CreateConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, + NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse, + YandexQuery::CreateConnectionRequest, + FederatedQuery::CreateConnectionResult>; + +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryCreateConnectionRPC(p.release())); +} + +using TFederatedQueryListConnectionsRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ListConnectionsRequest, FederatedQuery::ListConnectionsResponse>, + NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, + NYq::TEvControlPlaneProxy::TEvListConnectionsResponse, + YandexQuery::ListConnectionsRequest, + FederatedQuery::ListConnectionsResult>; + +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryListConnectionsRPC(p.release())); +} + +using TFederatedQueryDescribeConnectionRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DescribeConnectionRequest, FederatedQuery::DescribeConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse, + YandexQuery::DescribeConnectionRequest, + FederatedQuery::DescribeConnectionResult>; + +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeConnectionRPC(p.release())); +} + +using TFederatedQueryModifyConnectionRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ModifyConnectionRequest, FederatedQuery::ModifyConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, + NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse, + YandexQuery::ModifyConnectionRequest, + FederatedQuery::ModifyConnectionResult>; + +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryModifyConnectionRPC(p.release())); +} + +using TFederatedQueryDeleteConnectionRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DeleteConnectionRequest, FederatedQuery::DeleteConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, + NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse, + YandexQuery::DeleteConnectionRequest, + FederatedQuery::DeleteConnectionResult>; + +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteConnectionRPC(p.release())); +} + +using TFederatedQueryTestConnectionRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::TestConnectionRequest, FederatedQuery::TestConnectionResponse>, + NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, + NYq::TEvControlPlaneProxy::TEvTestConnectionResponse, + YandexQuery::TestConnectionRequest, + FederatedQuery::TestConnectionResult>; + +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryTestConnectionRPC(p.release())); +} + +using TFederatedQueryCreateBindingRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::CreateBindingRequest, FederatedQuery::CreateBindingResponse>, + NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, + NYq::TEvControlPlaneProxy::TEvCreateBindingResponse, + YandexQuery::CreateBindingRequest, + FederatedQuery::CreateBindingResult>; + +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { + TActivationContext::AsActorContext().Register(new TFederatedQueryCreateBindingRPC(p.release())); +} + +using TFederatedQueryListBindingsRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ListBindingsRequest, FederatedQuery::ListBindingsResponse>, + NYq::TEvControlPlaneProxy::TEvListBindingsRequest, + NYq::TEvControlPlaneProxy::TEvListBindingsResponse, + YandexQuery::ListBindingsRequest, + FederatedQuery::ListBindingsResult>; + +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryListBindingsRPC(p.release())); +} + +using TFederatedQueryDescribeBindingRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DescribeBindingRequest, FederatedQuery::DescribeBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, + NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse, + YandexQuery::DescribeBindingRequest, + FederatedQuery::DescribeBindingResult>; + +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeBindingRPC(p.release())); +} + +using TFederatedQueryModifyBindingRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::ModifyBindingRequest, FederatedQuery::ModifyBindingResponse>, + NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, + NYq::TEvControlPlaneProxy::TEvModifyBindingResponse, + YandexQuery::ModifyBindingRequest, + FederatedQuery::ModifyBindingResult>; + +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryModifyBindingRPC(p.release())); +} + +using TFederatedQueryDeleteBindingRPC = TFederatedQueryRequestRPC< + TGrpcFqRequestOperationCall<FederatedQuery::DeleteBindingRequest, FederatedQuery::DeleteBindingResponse>, + NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, + NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse, + YandexQuery::DeleteBindingRequest, + FederatedQuery::DeleteBindingResult>; + +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteBindingRPC(p.release())); +} + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_fq_internal.cpp b/ydb/core/grpc_services/rpc_fq_internal.cpp new file mode 100644 index 00000000000..b2f9c60c796 --- /dev/null +++ b/ydb/core/grpc_services/rpc_fq_internal.cpp @@ -0,0 +1,131 @@ +#include "service_fq_internal.h" +#include "rpc_common.h" +#include "rpc_deferrable.h" + +#include <ydb/core/yq/libs/events/events.h> +#include <ydb/core/yq/libs/actors/proxy_private.h> + +#include <library/cpp/actors/core/hfunc.h> + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +using TEvFqPrivatePingTaskRequest = + TGrpcRequestOperationCall<Fq::Private::PingTaskRequest, Fq::Private::PingTaskResponse>; +using TEvFqPrivateGetTaskRequest = + TGrpcRequestOperationCall<Fq::Private::GetTaskRequest, Fq::Private::GetTaskResponse>; +using TEvFqPrivateWriteTaskResultRequest = + TGrpcRequestOperationCall<Fq::Private::WriteTaskResultRequest, Fq::Private::WriteTaskResultResponse>; +using TEvFqPrivateNodesHealthCheckRequest = + TGrpcRequestOperationCall<Fq::Private::NodesHealthCheckRequest, Fq::Private::NodesHealthCheckResponse>; + +template <typename RpcRequestType, typename EvRequestType, typename EvResponseType, typename CastRequest, typename CastResult> +class TFqPrivateRequestRPC : public TRpcOperationRequestActor< + TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, RpcRequestType> { + + using TBase = TRpcOperationRequestActor< + TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, + RpcRequestType>; + +public: + TFqPrivateRequestRPC(IRequestOpCtx* request) : TBase(request) {} + + void Bootstrap(const TActorContext& ctx) { + Y_UNUSED(ctx); + const auto req = this->GetProtoRequest(); + TProtoStringType protoString; + Y_ENSURE(req->SerializeToString(&protoString)); + auto castedRequest = CastRequest(); + Y_ENSURE(castedRequest.ParseFromString(protoString)); + auto ev = MakeHolder<EvRequestType>(); + auto request = dynamic_cast<RpcRequestType*>(this->Request_.get()); + Y_VERIFY(request); + auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(request); + Y_VERIFY(proxyCtx); + TString user; + const TString& internalToken = proxyCtx->GetInternalToken(); + if (internalToken) { + NACLib::TUserToken userToken(internalToken); + user = userToken.GetUserSID(); + } + ev->Record = castedRequest; + ev->User = user; + this->Send(NYq::MakeYqPrivateProxyId(), ev.Release()); + this->Become(&TFqPrivateRequestRPC<RpcRequestType, EvRequestType, EvResponseType, CastRequest, CastResult>::StateFunc); + } + +private: + STRICT_STFUNC(StateFunc, + HFunc(EvResponseType, Handle); + ) + + void Handle(typename EvResponseType::TPtr& ev, const TActorContext& ctx) { + SendResponse(ev, *this->Request_); + this->Die(ctx); + } + + template <typename TEv, typename TReq> + void SendResponse(const TEv& ev, TReq& req) { + if (!ev->Get()->Record) { + req.RaiseIssues(ev->Get()->Issues); + req.ReplyWithYdbStatus(ev->Get()->Status); + } else { + TProtoStringType protoString; + Y_ENSURE(ev->Get()->Record->SerializeToString(&protoString)); + auto castedResult = CastResult(); + Y_ENSURE(castedResult.ParseFromString(protoString)); + + req.SendResult(castedResult, ev->Get()->Status); + } + } +}; + +using TFqPrivatePingTaskRPC = TFqPrivateRequestRPC< + TEvFqPrivatePingTaskRequest, + NYq::TEvents::TEvPingTaskRequest, + NYq::TEvents::TEvPingTaskResponse, + Yq::Private::PingTaskRequest, + Fq::Private::PingTaskResult>; + +using TFqPrivateGetTaskRPC = TFqPrivateRequestRPC< + TEvFqPrivateGetTaskRequest, + NYq::TEvents::TEvGetTaskRequest, + NYq::TEvents::TEvGetTaskResponse, + Yq::Private::GetTaskRequest, + Fq::Private::GetTaskResult>; + +using TFqPrivateWriteTaskResultRPC = TFqPrivateRequestRPC< + TEvFqPrivateWriteTaskResultRequest, + NYq::TEvents::TEvWriteTaskResultRequest, + NYq::TEvents::TEvWriteTaskResultResponse, + Yq::Private::WriteTaskResultRequest, + Fq::Private::WriteTaskResultResult>; + +using TFqPrivateNodesHealthCheckRPC = TFqPrivateRequestRPC< + TEvFqPrivateNodesHealthCheckRequest, + NYq::TEvents::TEvNodesHealthCheckRequest, + NYq::TEvents::TEvNodesHealthCheckResponse, + Yq::Private::NodesHealthCheckRequest, + Fq::Private::NodesHealthCheckResult>; + +void DoFqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFqPrivatePingTaskRPC(p.release())); +} + +void DoFqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFqPrivateGetTaskRPC(p.release())); +} + +void DoFqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFqPrivateWriteTaskResultRPC(p.release())); +} + +void DoFqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { + TActivationContext::AsActorContext().Register(new TFqPrivateNodesHealthCheckRPC(p.release())); +} + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_yq.cpp b/ydb/core/grpc_services/rpc_yq.cpp index 0179f36cab6..0a43bc12acb 100644 --- a/ydb/core/grpc_services/rpc_yq.cpp +++ b/ydb/core/grpc_services/rpc_yq.cpp @@ -1,3 +1,5 @@ +// DEPRECATED!!! use rpc_fq.cpp + #include "rpc_common.h" #include "rpc_deferrable.h" @@ -236,7 +238,7 @@ using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvGetResultDataRequest, NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>; -void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release())); } @@ -245,7 +247,7 @@ using TYandexQueryListJobsRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvListJobsRequest, NYq::TEvControlPlaneProxy::TEvListJobsResponse>; -void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release())); } @@ -254,7 +256,7 @@ using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvDescribeJobRequest, NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>; -void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release())); } @@ -263,7 +265,7 @@ using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest, NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>; -void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release())); } @@ -272,7 +274,7 @@ using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvListConnectionsRequest, NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>; -void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release())); } @@ -281,7 +283,7 @@ using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest, NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>; -void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release())); } @@ -290,7 +292,7 @@ using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest, NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>; -void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release())); } @@ -299,7 +301,7 @@ using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest, NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>; -void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release())); } @@ -308,7 +310,7 @@ using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvTestConnectionRequest, NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>; -void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release())); } @@ -317,7 +319,7 @@ using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvCreateBindingRequest, NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>; -void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { +void DoYandexQueryCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) { TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release())); } @@ -326,7 +328,7 @@ using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvListBindingsRequest, NYq::TEvControlPlaneProxy::TEvListBindingsResponse>; -void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release())); } @@ -335,7 +337,7 @@ using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest, NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>; -void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release())); } @@ -344,7 +346,7 @@ using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvModifyBindingRequest, NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>; -void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release())); } @@ -353,7 +355,7 @@ using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC< NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest, NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>; -void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { +void DoYandexQueryDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release())); } diff --git a/ydb/core/grpc_services/service_fq.h b/ydb/core/grpc_services/service_fq.h new file mode 100644 index 00000000000..99e52fbb137 --- /dev/null +++ b/ydb/core/grpc_services/service_fq.h @@ -0,0 +1,90 @@ +#pragma once + +#include <algorithm> +#include <memory> + +#include <ydb/core/base/ticket_parser.h> +#include <ydb/core/yq/libs/control_plane_proxy/utils.h> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +template <typename TReq, typename TResp> +class TGrpcFqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> { +public: + using TBase = TGrpcRequestOperationCall<TReq, TResp>; + using TBase::GetProtoRequest; + using TBase::GetPeerMetaValues; + using NPerms = NKikimr::TEvTicketParser::TEvAuthorizeTicket; + + const std::function<TVector<NPerms::TPermission>(const TReq&)>& Permissions; + TVector<TString> Sids; + + TGrpcFqRequestOperationCall(NGrpc::IRequestContextBase* ctx, + void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&), + const std::function<TVector<NPerms::TPermission>(const TReq&)>& permissions) + : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) { + } + + bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override { + TString scope = GetPeerMetaValues("x-ydb-fq-project").GetOrElse(""); + if (scope.StartsWith("yandexcloud://")) { + const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty(); + if (path.size() == 2 || path.size() == 3) { + const TString& folderId = path.back(); + const auto& permissions = Permissions(*GetProtoRequest()); + TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{ + permissions, + { + {"folder_id", folderId} + } + }}; + std::transform(permissions.begin(), permissions.end(), std::back_inserter(Sids), + [](const auto& s) -> TString { return s.Permission + "@as"; }); + + auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest()); + if (serviceAccountId) { + entries.push_back({ + {{NPerms::Required("iam.serviceAccounts.use")}}, + { + {"service_account_id", serviceAccountId} + }}); + Sids.push_back("iam.serviceAccounts.use@as"); + } + + iface->SetEntries(entries); + return true; + } + } + + return false; + } +}; + +void DoFederatedQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFederatedQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/grpc_services/service_fq_internal.h b/ydb/core/grpc_services/service_fq_internal.h new file mode 100644 index 00000000000..2b090de59ba --- /dev/null +++ b/ydb/core/grpc_services/service_fq_internal.h @@ -0,0 +1,17 @@ +#pragma once + +#include <memory> + +namespace NKikimr { +namespace NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoFqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoFqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/core/grpc_services/service_yq.h b/ydb/core/grpc_services/service_yq.h index 612aea4ec7d..3afc2180e27 100644 --- a/ydb/core/grpc_services/service_yq.h +++ b/ydb/core/grpc_services/service_yq.h @@ -74,20 +74,20 @@ void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); +void DoYandexQueryDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); } // namespace NGRpcService } // namespace NKikimr diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt index 8d386751ae2..f0411e6c092 100644 --- a/ydb/core/testlib/CMakeLists.txt +++ b/ydb/core/testlib/CMakeLists.txt @@ -45,8 +45,8 @@ target_link_libraries(ydb-core-testlib PUBLIC core-mind-bscontroller core-mind-hive ydb-core-node_whiteboard - ydb-core-protos ydb-core-persqueue + ydb-core-protos ydb-core-security core-sys_view-processor core-sys_view-service @@ -80,6 +80,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-services-cms ydb-services-datastreams ydb-services-discovery + ydb-services-fq ydb-services-kesus ydb-services-persqueue_cluster_discovery ydb-services-persqueue_v1 diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 0ccf2f38270..d1319f60597 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -8,6 +8,8 @@ #include <ydb/services/auth/grpc_service.h> #include <ydb/services/yq/grpc_service.h> #include <ydb/services/yq/private_grpc.h> +#include <ydb/services/fq/grpc_service.h> +#include <ydb/services/fq/private_grpc.h> #include <ydb/services/cms/grpc_service.h> #include <ydb/services/datastreams/grpc_service.h> #include <ydb/services/kesus/grpc_service.h> @@ -328,6 +330,8 @@ namespace Tests { if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcYqPrivateTaskService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId)); + GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxyId)); } if (const auto& factory = Settings->GrpcServiceFactory) { // All services enabled by default for ut diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index c0f82ed566b..09cfddad7f3 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -1,39 +1,41 @@ #pragma once +#include "message_builders_yq.h" + #include <util/datetime/base.h> #include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> #include <ydb/library/yql/public/issue/yql_issue_message.h> -#include <ydb/public/api/protos/yq.pb.h> +#include <ydb/public/api/protos/fq.pb.h> #include <ydb/core/yq/libs/control_plane_storage/events/events.h> #include <library/cpp/protobuf/interop/cast.h> -namespace NYq { +namespace NFq { // Queries class TCreateQueryBuilder { - YandexQuery::CreateQueryRequest Request; + FederatedQuery::CreateQueryRequest Request; public: TCreateQueryBuilder() { - SetMode(YandexQuery::RUN); - SetType(YandexQuery::QueryContent::ANALYTICS); + SetMode(FederatedQuery::RUN); + SetType(FederatedQuery::QueryContent::ANALYTICS); SetName("test_query_name_1"); - SetVisibility(YandexQuery::Acl::SCOPE); + SetVisibility(FederatedQuery::Acl::SCOPE); SetText("SELECT 1;"); } - TCreateQueryBuilder& SetMode(YandexQuery::ExecuteMode mode) + TCreateQueryBuilder& SetMode(FederatedQuery::ExecuteMode mode) { Request.set_execute_mode(mode); return *this; } - TCreateQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type) + TCreateQueryBuilder& SetType(FederatedQuery::QueryContent::QueryType type) { Request.mutable_content()->set_type(type); return *this; @@ -45,7 +47,7 @@ public: return *this; } - TCreateQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TCreateQueryBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; @@ -69,7 +71,7 @@ public: return *this; } - TCreateQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition) + TCreateQueryBuilder& SetDisposition(const FederatedQuery::StreamingDisposition& disposition) { *Request.mutable_disposition() = disposition; return *this; @@ -81,14 +83,14 @@ public: return *this; } - const YandexQuery::CreateQueryRequest& Build() + const FederatedQuery::CreateQueryRequest& Build() { return Request; } }; class TListQueriesBuilder { - YandexQuery::ListQueriesRequest Request; + FederatedQuery::ListQueriesRequest Request; public: TListQueriesBuilder() @@ -108,14 +110,14 @@ public: return *this; } - const YandexQuery::ListQueriesRequest& Build() + const FederatedQuery::ListQueriesRequest& Build() { return Request; } }; class TDescribeQueryBuilder { - YandexQuery::DescribeQueryRequest Request; + FederatedQuery::DescribeQueryRequest Request; public: TDescribeQueryBuilder& SetQueryId(const TString& queryId) @@ -124,14 +126,14 @@ public: return *this; } - const YandexQuery::DescribeQueryRequest& Build() + const FederatedQuery::DescribeQueryRequest& Build() { return Request; } }; class TGetQueryStatusBuilder { - YandexQuery::GetQueryStatusRequest Request; + FederatedQuery::GetQueryStatusRequest Request; public: TGetQueryStatusBuilder& SetQueryId(const TString& queryId) @@ -140,14 +142,14 @@ public: return *this; } - const YandexQuery::GetQueryStatusRequest& Build() + const FederatedQuery::GetQueryStatusRequest& Build() { return Request; } }; class TDeleteQueryBuilder { - YandexQuery::DeleteQueryRequest Request; + FederatedQuery::DeleteQueryRequest Request; public: TDeleteQueryBuilder& SetQueryId(const TString& queryId) @@ -168,22 +170,22 @@ public: return *this; } - const YandexQuery::DeleteQueryRequest& Build() + const FederatedQuery::DeleteQueryRequest& Build() { return Request; } }; class TModifyQueryBuilder { - YandexQuery::ModifyQueryRequest Request; + FederatedQuery::ModifyQueryRequest Request; public: TModifyQueryBuilder() { SetName("test_query_name_2"); - SetMode(YandexQuery::RUN); - SetType(YandexQuery::QueryContent::ANALYTICS); - SetVisibility(YandexQuery::Acl::SCOPE); + SetMode(FederatedQuery::RUN); + SetType(FederatedQuery::QueryContent::ANALYTICS); + SetVisibility(FederatedQuery::Acl::SCOPE); SetText("SELECT 1;"); } @@ -193,19 +195,19 @@ public: return *this; } - TModifyQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type) + TModifyQueryBuilder& SetType(FederatedQuery::QueryContent::QueryType type) { Request.mutable_content()->set_type(type); return *this; } - TModifyQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TModifyQueryBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; } - TModifyQueryBuilder& SetMode(YandexQuery::ExecuteMode mode) + TModifyQueryBuilder& SetMode(FederatedQuery::ExecuteMode mode) { Request.set_execute_mode(mode); return *this; @@ -223,13 +225,13 @@ public: return *this; } - TModifyQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition) + TModifyQueryBuilder& SetDisposition(const FederatedQuery::StreamingDisposition& disposition) { *Request.mutable_disposition() = disposition; return *this; } - TModifyQueryBuilder& SetState(const YandexQuery::StateLoadMode& state) + TModifyQueryBuilder& SetState(const FederatedQuery::StateLoadMode& state) { Request.set_state_load_mode(state); return *this; @@ -259,22 +261,22 @@ public: return *this; } - const YandexQuery::ModifyQueryRequest& Build() + const FederatedQuery::ModifyQueryRequest& Build() { return Request; } }; class TControlQueryBuilder { - YandexQuery::ControlQueryRequest Request; + FederatedQuery::ControlQueryRequest Request; public: TControlQueryBuilder() { - SetAction(YandexQuery::ABORT); + SetAction(FederatedQuery::ABORT); } - TControlQueryBuilder& SetAction(const YandexQuery::QueryAction& action) + TControlQueryBuilder& SetAction(const FederatedQuery::QueryAction& action) { Request.set_action(action); return *this; @@ -298,7 +300,7 @@ public: return *this; } - const YandexQuery::ControlQueryRequest& Build() + const FederatedQuery::ControlQueryRequest& Build() { return Request; } @@ -307,7 +309,7 @@ public: // Results class TGetResultDataBuilder { - YandexQuery::GetResultDataRequest Request; + FederatedQuery::GetResultDataRequest Request; public: TGetResultDataBuilder() @@ -339,7 +341,7 @@ public: return *this; } - const YandexQuery::GetResultDataRequest& Build() + const FederatedQuery::GetResultDataRequest& Build() { return Request; } @@ -348,7 +350,7 @@ public: // Jobs class TListJobsBuilder { - YandexQuery::ListJobsRequest Request; + FederatedQuery::ListJobsRequest Request; public: TListJobsBuilder() @@ -374,14 +376,14 @@ public: return *this; } - const YandexQuery::ListJobsRequest& Build() + const FederatedQuery::ListJobsRequest& Build() { return Request; } }; class TDescribeJobBuilder { - YandexQuery::DescribeJobRequest Request; + FederatedQuery::DescribeJobRequest Request; public: TDescribeJobBuilder& SetJobId(const TString& jobId) @@ -390,7 +392,7 @@ public: return *this; } - const YandexQuery::DescribeJobRequest& Build() + const FederatedQuery::DescribeJobRequest& Build() { return Request; } @@ -399,13 +401,13 @@ public: // Connections class TCreateConnectionBuilder { - YandexQuery::CreateConnectionRequest Request; + FederatedQuery::CreateConnectionRequest Request; public: TCreateConnectionBuilder() { SetName("test_connection_name_1"); - SetVisibility(YandexQuery::Acl::SCOPE); + SetVisibility(FederatedQuery::Acl::SCOPE); CreateDataStreams("my_database_id", ""); } @@ -477,7 +479,7 @@ public: return *this; } - TCreateConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TCreateConnectionBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; @@ -501,14 +503,14 @@ public: return *this; } - const YandexQuery::CreateConnectionRequest& Build() + const FederatedQuery::CreateConnectionRequest& Build() { return Request; } }; class TListConnectionsBuilder { - YandexQuery::ListConnectionsRequest Request; + FederatedQuery::ListConnectionsRequest Request; public: TListConnectionsBuilder() @@ -528,14 +530,14 @@ public: return *this; } - const YandexQuery::ListConnectionsRequest& Build() + const FederatedQuery::ListConnectionsRequest& Build() { return Request; } }; class TDescribeConnectionBuilder { - YandexQuery::DescribeConnectionRequest Request; + FederatedQuery::DescribeConnectionRequest Request; public: TDescribeConnectionBuilder& SetConnectionId(const TString& connectionId) @@ -544,20 +546,20 @@ public: return *this; } - const YandexQuery::DescribeConnectionRequest& Build() + const FederatedQuery::DescribeConnectionRequest& Build() { return Request; } }; class TModifyConnectionBuilder { - YandexQuery::ModifyConnectionRequest Request; + FederatedQuery::ModifyConnectionRequest Request; public: TModifyConnectionBuilder() { SetName("test_connection_name_2"); - SetVisibility(YandexQuery::Acl::SCOPE); + SetVisibility(FederatedQuery::Acl::SCOPE); CreateDataStreams("my_database_id", ""); } @@ -615,7 +617,7 @@ public: return *this; } - TModifyConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TModifyConnectionBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; @@ -651,14 +653,14 @@ public: return *this; } - const YandexQuery::ModifyConnectionRequest& Build() + const FederatedQuery::ModifyConnectionRequest& Build() { return Request; } }; class TDeleteConnectionBuilder { - YandexQuery::DeleteConnectionRequest Request; + FederatedQuery::DeleteConnectionRequest Request; public: TDeleteConnectionBuilder& SetConnectionId(const TString& connectionId) @@ -679,7 +681,7 @@ public: return *this; } - const YandexQuery::DeleteConnectionRequest& Build() + const FederatedQuery::DeleteConnectionRequest& Build() { return Request; } @@ -688,14 +690,14 @@ public: // Bindings class TCreateBindingBuilder { - YandexQuery::CreateBindingRequest Request; + FederatedQuery::CreateBindingRequest Request; public: TCreateBindingBuilder() { SetName("test_binding_name_1"); - SetVisibility(YandexQuery::Acl::SCOPE); - YandexQuery::DataStreamsBinding binding; + SetVisibility(FederatedQuery::Acl::SCOPE); + FederatedQuery::DataStreamsBinding binding; binding.set_stream_name("my_stream"); binding.set_format("json"); binding.set_compression("zip"); @@ -711,19 +713,19 @@ public: return *this; } - TCreateBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding) + TCreateBindingBuilder& CreateDataStreams(const FederatedQuery::DataStreamsBinding& binding) { *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding; return *this; } - TCreateBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding) + TCreateBindingBuilder& CreateObjectStorage(const FederatedQuery::ObjectStorageBinding& binding) { *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding; return *this; } - TCreateBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TCreateBindingBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; @@ -747,14 +749,14 @@ public: return *this; } - const YandexQuery::CreateBindingRequest& Build() + const FederatedQuery::CreateBindingRequest& Build() { return Request; } }; class TListBindingsBuilder { - YandexQuery::ListBindingsRequest Request; + FederatedQuery::ListBindingsRequest Request; public: TListBindingsBuilder() @@ -780,14 +782,14 @@ public: return *this; } - const YandexQuery::ListBindingsRequest& Build() + const FederatedQuery::ListBindingsRequest& Build() { return Request; } }; class TDescribeBindingBuilder { - YandexQuery::DescribeBindingRequest Request; + FederatedQuery::DescribeBindingRequest Request; public: TDescribeBindingBuilder& SetBindingId(const TString& bindingId) @@ -796,21 +798,21 @@ public: return *this; } - const YandexQuery::DescribeBindingRequest& Build() + const FederatedQuery::DescribeBindingRequest& Build() { return Request; } }; class TModifyBindingBuilder { - YandexQuery::ModifyBindingRequest Request; + FederatedQuery::ModifyBindingRequest Request; public: TModifyBindingBuilder() { SetName("test_binding_name_2"); - SetVisibility(YandexQuery::Acl::SCOPE); - YandexQuery::DataStreamsBinding binding; + SetVisibility(FederatedQuery::Acl::SCOPE); + FederatedQuery::DataStreamsBinding binding; binding.set_stream_name("my_stream"); binding.set_format("json"); binding.set_compression("zip"); @@ -826,19 +828,19 @@ public: return *this; } - TModifyBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding) + TModifyBindingBuilder& CreateDataStreams(const FederatedQuery::DataStreamsBinding& binding) { *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding; return *this; } - TModifyBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding) + TModifyBindingBuilder& CreateObjectStorage(const FederatedQuery::ObjectStorageBinding& binding) { *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding; return *this; } - TModifyBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + TModifyBindingBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility) { Request.mutable_content()->mutable_acl()->set_visibility(visibility); return *this; @@ -874,14 +876,14 @@ public: return *this; } - const YandexQuery::ModifyBindingRequest& Build() + const FederatedQuery::ModifyBindingRequest& Build() { return Request; } }; class TDeleteBindingBuilder { - YandexQuery::DeleteBindingRequest Request; + FederatedQuery::DeleteBindingRequest Request; public: TDeleteBindingBuilder& SetBindingId(const TString& bindingId) @@ -902,7 +904,7 @@ public: return *this; } - const YandexQuery::DeleteBindingRequest& Build() + const FederatedQuery::DeleteBindingRequest& Build() { return Request; } @@ -957,9 +959,9 @@ public: return *this; } - std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build() + std::unique_ptr<NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest> Build() { - auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>(); + auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest>(); request->Request.mutable_result_id()->set_value(ResultId); *request->Request.mutable_result_set() = ResultSet; request->Request.set_result_set_id(ResultSetId); @@ -1004,9 +1006,9 @@ public: return *this; } - std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build() + std::unique_ptr<NYq::TEvControlPlaneStorage::TEvGetTaskRequest> Build() { - auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(); + auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>(); request->Request.set_tenant(TenantName); request->Request.set_owner_id(Owner); request->Request.set_host(HostName); @@ -1022,11 +1024,11 @@ class TPingTaskBuilder { TString ResultId; TString Owner; TInstant Deadline; - TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status; + TMaybe<FederatedQuery::QueryMeta::ComputeStatus> Status; TMaybe<NYql::TIssues> Issues; TMaybe<NYql::TIssues> TransientIssues; TMaybe<TString> Statistics; - TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; + TMaybe<TVector<FederatedQuery::ResultSetMeta>> ResultSetMetas; TMaybe<TString> Ast; TMaybe<TString> Plan; TMaybe<TInstant> StartedAt; @@ -1086,7 +1088,7 @@ public: return *this; } - TPingTaskBuilder& SetStatus(const YandexQuery::QueryMeta::ComputeStatus& status) + TPingTaskBuilder& SetStatus(const FederatedQuery::QueryMeta::ComputeStatus& status) { Status = status; return *this; @@ -1110,7 +1112,7 @@ public: return *this; } - TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas) + TPingTaskBuilder& SetResultSetMetas(const TVector<FederatedQuery::ResultSetMeta>& resultSetMetas) { ResultSetMetas = resultSetMetas; return *this; @@ -1140,9 +1142,9 @@ public: return *this; } - TPingTaskBuilder& SetResignQuery(bool resignQuery = true) - { - ResignQuery = resignQuery; + TPingTaskBuilder& SetResignQuery(bool resignQuery = true) + { + ResignQuery = resignQuery; return *this; } @@ -1170,14 +1172,14 @@ public: return *this; } - std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build() + std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build() { Yq::Private::PingTaskRequest request; request.set_owner_id(Owner); request.mutable_query_id()->set_value(QueryId); request.mutable_result_id()->set_value(ResultId); if (Status) { - request.set_status(*Status); + request.set_status((YandexQuery::QueryMeta::ComputeStatus)*Status); } request.set_status_code(StatusCode); if (Issues) { @@ -1191,7 +1193,9 @@ public: } if (ResultSetMetas) { for (const auto& meta : *ResultSetMetas) { - *request.add_result_set_meta() = meta; + YandexQuery::ResultSetMeta casted; + casted.CopyFrom(meta); + *request.add_result_set_meta() = casted; } } for (const auto& dqGraph : DqGraphs) { @@ -1226,7 +1230,7 @@ public: *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt); } - return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); + return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); } }; @@ -1285,7 +1289,7 @@ public: return *this; } - std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build() + std::unique_ptr<NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build() { Yq::Private::NodesHealthCheckRequest request; request.set_tenant(TenantName); @@ -1296,7 +1300,7 @@ public: node.set_active_workers(ActiveWorkers); node.set_memory_limit(MemoryLimit); node.set_memory_allocated(MemoryAllocated); - return std::make_unique<TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request)); + return std::make_unique<NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request)); } }; diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h new file mode 100644 index 00000000000..e0fc67201e1 --- /dev/null +++ b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h @@ -0,0 +1,1305 @@ +// TODO: remove YQ-1055 + +#pragma once + +#include <util/datetime/base.h> + +#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/public/api/protos/yq.pb.h> + +#include <ydb/core/yq/libs/control_plane_storage/events/events.h> + +#include <library/cpp/protobuf/interop/cast.h> + +namespace NYq { + +// Queries + +class TCreateQueryBuilder { + YandexQuery::CreateQueryRequest Request; + +public: + TCreateQueryBuilder() + { + SetMode(YandexQuery::RUN); + SetType(YandexQuery::QueryContent::ANALYTICS); + SetName("test_query_name_1"); + SetVisibility(YandexQuery::Acl::SCOPE); + SetText("SELECT 1;"); + } + + TCreateQueryBuilder& SetMode(YandexQuery::ExecuteMode mode) + { + Request.set_execute_mode(mode); + return *this; + } + + TCreateQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type) + { + Request.mutable_content()->set_type(type); + return *this; + } + + TCreateQueryBuilder& SetAutomatic(bool automatic) + { + Request.mutable_content()->set_automatic(automatic); + return *this; + } + + TCreateQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TCreateQueryBuilder& SetText(const TString& content) + { + Request.mutable_content()->set_text(content); + return *this; + } + + TCreateQueryBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TCreateQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TCreateQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition) + { + *Request.mutable_disposition() = disposition; + return *this; + } + + TCreateQueryBuilder& ClearAcl() + { + Request.mutable_content()->clear_acl(); + return *this; + } + + const YandexQuery::CreateQueryRequest& Build() + { + return Request; + } +}; + +class TListQueriesBuilder { + YandexQuery::ListQueriesRequest Request; + +public: + TListQueriesBuilder() + { + SetLimit(10); + } + + TListQueriesBuilder& SetPageToken(const TString& pageToken) + { + Request.set_page_token(pageToken); + return *this; + } + + TListQueriesBuilder& SetLimit(int64_t limit) + { + Request.set_limit(limit); + return *this; + } + + const YandexQuery::ListQueriesRequest& Build() + { + return Request; + } +}; + +class TDescribeQueryBuilder { + YandexQuery::DescribeQueryRequest Request; + +public: + TDescribeQueryBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + const YandexQuery::DescribeQueryRequest& Build() + { + return Request; + } +}; + +class TGetQueryStatusBuilder { + YandexQuery::GetQueryStatusRequest Request; + +public: + TGetQueryStatusBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + const YandexQuery::GetQueryStatusRequest& Build() + { + return Request; + } +}; + +class TDeleteQueryBuilder { + YandexQuery::DeleteQueryRequest Request; + +public: + TDeleteQueryBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + TDeleteQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TDeleteQueryBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::DeleteQueryRequest& Build() + { + return Request; + } +}; + +class TModifyQueryBuilder { + YandexQuery::ModifyQueryRequest Request; + +public: + TModifyQueryBuilder() + { + SetName("test_query_name_2"); + SetMode(YandexQuery::RUN); + SetType(YandexQuery::QueryContent::ANALYTICS); + SetVisibility(YandexQuery::Acl::SCOPE); + SetText("SELECT 1;"); + } + + TModifyQueryBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + TModifyQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type) + { + Request.mutable_content()->set_type(type); + return *this; + } + + TModifyQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TModifyQueryBuilder& SetMode(YandexQuery::ExecuteMode mode) + { + Request.set_execute_mode(mode); + return *this; + } + + TModifyQueryBuilder& SetAutomatic(bool automatic) + { + Request.mutable_content()->set_automatic(automatic); + return *this; + } + + TModifyQueryBuilder& SetText(const TString& content) + { + Request.mutable_content()->set_text(content); + return *this; + } + + TModifyQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition) + { + *Request.mutable_disposition() = disposition; + return *this; + } + + TModifyQueryBuilder& SetState(const YandexQuery::StateLoadMode& state) + { + Request.set_state_load_mode(state); + return *this; + } + + TModifyQueryBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TModifyQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TModifyQueryBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + TModifyQueryBuilder& SetDescription(const TString& description) + { + Request.mutable_content()->set_description(description); + return *this; + } + + const YandexQuery::ModifyQueryRequest& Build() + { + return Request; + } +}; + +class TControlQueryBuilder { + YandexQuery::ControlQueryRequest Request; + +public: + TControlQueryBuilder() + { + SetAction(YandexQuery::ABORT); + } + + TControlQueryBuilder& SetAction(const YandexQuery::QueryAction& action) + { + Request.set_action(action); + return *this; + } + + TControlQueryBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + TControlQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TControlQueryBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::ControlQueryRequest& Build() + { + return Request; + } +}; + +// Results + +class TGetResultDataBuilder { + YandexQuery::GetResultDataRequest Request; + +public: + TGetResultDataBuilder() + { + SetLimit(10); + } + + TGetResultDataBuilder& SetQueryId(const TString& queryId) + { + Request.set_query_id(queryId); + return *this; + } + + TGetResultDataBuilder& SetResultSetIndex(int64_t resultSetIndex) + { + Request.set_result_set_index(resultSetIndex); + return *this; + } + + TGetResultDataBuilder& SetOffset(int64_t offset) + { + Request.set_offset(offset); + return *this; + } + + TGetResultDataBuilder& SetLimit(int64_t limit) + { + Request.set_limit(limit); + return *this; + } + + const YandexQuery::GetResultDataRequest& Build() + { + return Request; + } +}; + +// Jobs + +class TListJobsBuilder { + YandexQuery::ListJobsRequest Request; + +public: + TListJobsBuilder() + { + SetLimit(10); + } + + TListJobsBuilder& SetQueryId(const TString& queryId) + { + Request.mutable_filter()->set_query_id(queryId); + return *this; + } + + TListJobsBuilder& SetPageToken(const TString& pageToken) + { + Request.set_page_token(pageToken); + return *this; + } + + TListJobsBuilder& SetLimit(int64_t limit) + { + Request.set_limit(limit); + return *this; + } + + const YandexQuery::ListJobsRequest& Build() + { + return Request; + } +}; + +class TDescribeJobBuilder { + YandexQuery::DescribeJobRequest Request; + +public: + TDescribeJobBuilder& SetJobId(const TString& jobId) + { + Request.set_job_id(jobId); + return *this; + } + + const YandexQuery::DescribeJobRequest& Build() + { + return Request; + } +}; + +// Connections + +class TCreateConnectionBuilder { + YandexQuery::CreateConnectionRequest Request; + +public: + TCreateConnectionBuilder() + { + SetName("test_connection_name_1"); + SetVisibility(YandexQuery::Acl::SCOPE); + CreateDataStreams("my_database_id", ""); + } + + TCreateConnectionBuilder& CreateYdb(const TString& database, const TString& endpoint, const TString& serviceAccount) + { + auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database(); + if (serviceAccount) { + ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + ydb.mutable_auth()->mutable_current_iam(); + } + + ydb.set_database(database); + ydb.set_endpoint(endpoint); + return *this; + } + + TCreateConnectionBuilder& CreateYdb(const TString& databaseId, const TString& serviceAccount) + { + auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database(); + if (serviceAccount) { + ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + ydb.mutable_auth()->mutable_current_iam(); + } + + ydb.set_database_id(databaseId); + return *this; + } + + TCreateConnectionBuilder& CreateDataStreams(const TString& databaseId, const TString& serviceAccount) + { + auto& yds = *Request.mutable_content()->mutable_setting()->mutable_data_streams(); + if (serviceAccount) { + yds.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + yds.mutable_auth()->mutable_current_iam(); + } + + yds.set_database_id(databaseId); + return *this; + } + + TCreateConnectionBuilder& CreateClickHouse(const TString& databaseId, const TString& login, const TString& password, const TString& serviceAccount) + { + auto& ch = *Request.mutable_content()->mutable_setting()->mutable_clickhouse_cluster(); + if (serviceAccount) { + ch.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + ch.mutable_auth()->mutable_current_iam(); + } + + ch.set_database_id(databaseId); + ch.set_login(login); + ch.set_password(password); + return *this; + } + + TCreateConnectionBuilder& CreateObjectStorage(const TString& bucket, const TString& serviceAccount) + { + auto& os = *Request.mutable_content()->mutable_setting()->mutable_object_storage(); + if (serviceAccount) { + os.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + os.mutable_auth()->mutable_current_iam(); + } + + os.set_bucket(bucket); + return *this; + } + + TCreateConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TCreateConnectionBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TCreateConnectionBuilder& SetDescription(const TString& description) + { + Request.mutable_content()->set_name(description); + return *this; + } + + TCreateConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + const YandexQuery::CreateConnectionRequest& Build() + { + return Request; + } +}; + +class TListConnectionsBuilder { + YandexQuery::ListConnectionsRequest Request; + +public: + TListConnectionsBuilder() + { + SetLimit(10); + } + + TListConnectionsBuilder& SetPageToken(const TString& pageToken) + { + Request.set_page_token(pageToken); + return *this; + } + + TListConnectionsBuilder& SetLimit(int64_t limit) + { + Request.set_limit(limit); + return *this; + } + + const YandexQuery::ListConnectionsRequest& Build() + { + return Request; + } +}; + +class TDescribeConnectionBuilder { + YandexQuery::DescribeConnectionRequest Request; + +public: + TDescribeConnectionBuilder& SetConnectionId(const TString& connectionId) + { + Request.set_connection_id(connectionId); + return *this; + } + + const YandexQuery::DescribeConnectionRequest& Build() + { + return Request; + } +}; + +class TModifyConnectionBuilder { + YandexQuery::ModifyConnectionRequest Request; + +public: + TModifyConnectionBuilder() + { + SetName("test_connection_name_2"); + SetVisibility(YandexQuery::Acl::SCOPE); + CreateDataStreams("my_database_id", ""); + } + + TModifyConnectionBuilder& CreateYdb(const TString& databaseId, const TString& serviceAccount) + { + auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database(); + if (serviceAccount) { + ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + ydb.mutable_auth()->mutable_current_iam(); + } + + ydb.set_database_id(databaseId); + return *this; + } + + TModifyConnectionBuilder& CreateDataStreams(const TString& databaseId, const TString& serviceAccount) + { + auto& yds = *Request.mutable_content()->mutable_setting()->mutable_data_streams(); + if (serviceAccount) { + yds.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + yds.mutable_auth()->mutable_current_iam(); + } + + yds.set_database_id(databaseId); + return *this; + } + + TModifyConnectionBuilder& CreateClickHouse(const TString& databaseId, const TString& login, const TString& password, const TString& serviceAccount) + { + auto& ch = *Request.mutable_content()->mutable_setting()->mutable_clickhouse_cluster(); + if (serviceAccount) { + ch.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + ch.mutable_auth()->mutable_current_iam(); + } + + ch.set_database_id(databaseId); + ch.set_login(login); + ch.set_password(password); + return *this; + } + + TModifyConnectionBuilder& CreateObjectStorage(const TString& bucket, const TString& serviceAccount) + { + auto& os = *Request.mutable_content()->mutable_setting()->mutable_object_storage(); + if (serviceAccount) { + os.mutable_auth()->mutable_service_account()->set_id(serviceAccount); + } else { + os.mutable_auth()->mutable_current_iam(); + } + + os.set_bucket(bucket); + return *this; + } + + TModifyConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TModifyConnectionBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TModifyConnectionBuilder& SetDescription(const TString& description) + { + Request.mutable_content()->set_description(description); + return *this; + } + + TModifyConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TModifyConnectionBuilder& SetConnectionId(const TString& connectionId) + { + Request.set_connection_id(connectionId); + return *this; + } + + TModifyConnectionBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::ModifyConnectionRequest& Build() + { + return Request; + } +}; + +class TDeleteConnectionBuilder { + YandexQuery::DeleteConnectionRequest Request; + +public: + TDeleteConnectionBuilder& SetConnectionId(const TString& connectionId) + { + Request.set_connection_id(connectionId); + return *this; + } + + TDeleteConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TDeleteConnectionBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::DeleteConnectionRequest& Build() + { + return Request; + } +}; + +// Bindings + +class TCreateBindingBuilder { + YandexQuery::CreateBindingRequest Request; + +public: + TCreateBindingBuilder() + { + SetName("test_binding_name_1"); + SetVisibility(YandexQuery::Acl::SCOPE); + YandexQuery::DataStreamsBinding binding; + binding.set_stream_name("my_stream"); + binding.set_format("json"); + binding.set_compression("zip"); + auto* column = binding.mutable_schema()->add_column(); + column->set_name("sample_column_name"); + column->mutable_type()->set_type_id(Ydb::Type::UINT64); + CreateDataStreams(binding); + } + + TCreateBindingBuilder& SetConnectionId(const TString& connectionId) + { + Request.mutable_content()->set_connection_id(connectionId); + return *this; + } + + TCreateBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding) + { + *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding; + return *this; + } + + TCreateBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding) + { + *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding; + return *this; + } + + TCreateBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TCreateBindingBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TCreateBindingBuilder& SetDescription(const TString& description) + { + Request.mutable_content()->set_name(description); + return *this; + } + + TCreateBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + const YandexQuery::CreateBindingRequest& Build() + { + return Request; + } +}; + +class TListBindingsBuilder { + YandexQuery::ListBindingsRequest Request; + +public: + TListBindingsBuilder() + { + SetLimit(10); + } + + TListBindingsBuilder& SetPageToken(const TString& pageToken) + { + Request.set_page_token(pageToken); + return *this; + } + + TListBindingsBuilder& SetLimit(int64_t limit) + { + Request.set_limit(limit); + return *this; + } + + TListBindingsBuilder& SetConnectionId(const TString& connectionId) + { + Request.mutable_filter()->set_connection_id(connectionId); + return *this; + } + + const YandexQuery::ListBindingsRequest& Build() + { + return Request; + } +}; + +class TDescribeBindingBuilder { + YandexQuery::DescribeBindingRequest Request; + +public: + TDescribeBindingBuilder& SetBindingId(const TString& bindingId) + { + Request.set_binding_id(bindingId); + return *this; + } + + const YandexQuery::DescribeBindingRequest& Build() + { + return Request; + } +}; + +class TModifyBindingBuilder { + YandexQuery::ModifyBindingRequest Request; + +public: + TModifyBindingBuilder() + { + SetName("test_binding_name_2"); + SetVisibility(YandexQuery::Acl::SCOPE); + YandexQuery::DataStreamsBinding binding; + binding.set_stream_name("my_stream"); + binding.set_format("json"); + binding.set_compression("zip"); + auto* column = binding.mutable_schema()->add_column(); + column->set_name("sample_column_name"); + column->mutable_type()->set_type_id(Ydb::Type::UINT64); + CreateDataStreams(binding); + } + + TModifyBindingBuilder& SetConnectionId(const TString& connectionId) + { + Request.mutable_content()->set_connection_id(connectionId); + return *this; + } + + TModifyBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding) + { + *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding; + return *this; + } + + TModifyBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding) + { + *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding; + return *this; + } + + TModifyBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility) + { + Request.mutable_content()->mutable_acl()->set_visibility(visibility); + return *this; + } + + TModifyBindingBuilder& SetName(const TString& name) + { + Request.mutable_content()->set_name(name); + return *this; + } + + TModifyBindingBuilder& SetDescription(const TString& description) + { + Request.mutable_content()->set_name(description); + return *this; + } + + TModifyBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TModifyBindingBuilder& SetBindingId(const TString& bindingId) + { + Request.set_binding_id(bindingId); + return *this; + } + + TModifyBindingBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::ModifyBindingRequest& Build() + { + return Request; + } +}; + +class TDeleteBindingBuilder { + YandexQuery::DeleteBindingRequest Request; + +public: + TDeleteBindingBuilder& SetBindingId(const TString& bindingId) + { + Request.set_binding_id(bindingId); + return *this; + } + + TDeleteBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey) + { + Request.set_idempotency_key(idempotencyKey); + return *this; + } + + TDeleteBindingBuilder& SetPreviousRevision(const int64_t periousRevision) + { + Request.set_previous_revision(periousRevision); + return *this; + } + + const YandexQuery::DeleteBindingRequest& Build() + { + return Request; + } +}; + +// internal + +class TWriteResultDataBuilder { + TString ResultId; + int32_t ResultSetId = 0; + int64_t StartRowId = 0; + TInstant Deadline; + Ydb::ResultSet ResultSet; + +public: + TWriteResultDataBuilder() + { + SetDeadline(TInstant::Now() + TDuration::Minutes(5)); + Ydb::ResultSet resultSet; + auto& value = *resultSet.add_rows(); + value.set_int64_value(1); + SetResultSet(resultSet); + } + + TWriteResultDataBuilder& SetResultId(const TString& resultId) + { + ResultId = resultId; + return *this; + } + + TWriteResultDataBuilder& SetResultSetIndex(int32_t resultSetId) + { + ResultSetId = resultSetId; + return *this; + } + + TWriteResultDataBuilder& SetStartRowId(int64_t startRowId) + { + StartRowId = startRowId; + return *this; + } + + TWriteResultDataBuilder& SetDeadline(const TInstant& deadline) + { + Deadline = deadline; + return *this; + } + + TWriteResultDataBuilder& SetResultSet(const Ydb::ResultSet& resultSet) + { + ResultSet = resultSet; + return *this; + } + + std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build() + { + auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>(); + request->Request.mutable_result_id()->set_value(ResultId); + *request->Request.mutable_result_set() = ResultSet; + request->Request.set_result_set_id(ResultSetId); + request->Request.set_offset(StartRowId); + *request->Request.mutable_deadline() = NProtoInterop::CastToProto(Deadline); + return request; + } +}; + +class TGetTaskBuilder { + TString Owner; + TString HostName; + TString TenantName; + +public: + TGetTaskBuilder() + { + SetOwner(DefaultOwner()); + SetHostName("localhost"); + SetTenantName("/root/tenant"); + } + + static TString DefaultOwner() { + return "owner"; + } + + TGetTaskBuilder& SetOwner(const TString& owner) + { + Owner = owner; + return *this; + } + + TGetTaskBuilder& SetHostName(const TString& hostName) + { + HostName = hostName; + return *this; + } + + TGetTaskBuilder& SetTenantName(const TString& tenantName) + { + TenantName = tenantName; + return *this; + } + + std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build() + { + auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>(); + request->Request.set_tenant(TenantName); + request->Request.set_owner_id(Owner); + request->Request.set_host(HostName); + return request; + } +}; + +class TPingTaskBuilder { + TString TenantName; + TString CloudId; + TString Scope; + TString QueryId; + TString ResultId; + TString Owner; + TInstant Deadline; + TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status; + TMaybe<NYql::TIssues> Issues; + TMaybe<NYql::TIssues> TransientIssues; + TMaybe<TString> Statistics; + TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas; + TMaybe<TString> Ast; + TMaybe<TString> Plan; + TMaybe<TInstant> StartedAt; + TMaybe<TInstant> FinishedAt; + bool ResignQuery = false; + NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::UNSPECIFIED; + TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers; + TVector<TString> DqGraphs; + i32 DqGraphIndex = 0; + +public: + TPingTaskBuilder() + { + SetDeadline(TInstant::Now() + TDuration::Minutes(5)); + SetTenantName("/root/tenant"); + } + + TPingTaskBuilder& SetTenantName(const TString& tenantName) + { + TenantName = tenantName; + return *this; + } + + TPingTaskBuilder& SetCloudId(const TString& cloudId) + { + CloudId = cloudId; + return *this; + } + + TPingTaskBuilder& SetScope(const TString& scope) + { + Scope = scope; + return *this; + } + + TPingTaskBuilder& SetQueryId(const TString& queryId) + { + QueryId = queryId; + return *this; + } + + TPingTaskBuilder& SetResultId(const TString& resultId) + { + ResultId = resultId; + return *this; + } + + TPingTaskBuilder& SetOwner(const TString& owner) + { + Owner = owner; + return *this; + } + + TPingTaskBuilder& SetDeadline(const TInstant& deadline) + { + Deadline = deadline; + return *this; + } + + TPingTaskBuilder& SetStatus(const YandexQuery::QueryMeta::ComputeStatus& status) + { + Status = status; + return *this; + } + + TPingTaskBuilder& SetIssues(const NYql::TIssues& issues) + { + Issues = issues; + return *this; + } + + TPingTaskBuilder& SetTransientIssues(const NYql::TIssues& issues) + { + TransientIssues = issues; + return *this; + } + + TPingTaskBuilder& SetStatistics(const TString& statistics) + { + Statistics = statistics; + return *this; + } + + TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas) + { + ResultSetMetas = resultSetMetas; + return *this; + } + + TPingTaskBuilder& SetAst(const TString& ast) + { + Ast = ast; + return *this; + } + + TPingTaskBuilder& SetPlan(const TString& plan) + { + Plan = plan; + return *this; + } + + TPingTaskBuilder& SetStatedAt(const TInstant& started) + { + StartedAt = started; + return *this; + } + + TPingTaskBuilder& SetFinishedAt(const TInstant& finished) + { + FinishedAt = finished; + return *this; + } + + TPingTaskBuilder& SetResignQuery(bool resignQuery = true) + { + ResignQuery = resignQuery; + return *this; + } + + TPingTaskBuilder& SetStatusCode(NYql::NDqProto::StatusIds::StatusCode statusCode = NYql::NDqProto::StatusIds::UNSPECIFIED) + { + StatusCode = statusCode; + return *this; + } + + TPingTaskBuilder& AddCreatedConsumer(const TString& databaseId, const TString& database, const TString& topicPath, const TString& consumerName, const TString& clusterEndpoint, bool useSsl) + { + CreatedTopicConsumers.emplace_back(NYq::TEvControlPlaneStorage::TTopicConsumer{databaseId, database, topicPath, consumerName, clusterEndpoint, useSsl, "", false}); + return *this; + } + + TPingTaskBuilder& AddDqGraph(const TString& dqGraph) + { + DqGraphs.push_back(dqGraph); + return *this; + } + + TPingTaskBuilder& SetDqGraphIndex(i32 dqGraphIndex) + { + DqGraphIndex = dqGraphIndex; + return *this; + } + + std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build() + { + Yq::Private::PingTaskRequest request; + request.set_owner_id(Owner); + request.mutable_query_id()->set_value(QueryId); + request.mutable_result_id()->set_value(ResultId); + if (Status) { + request.set_status(*Status); + } + request.set_status_code(StatusCode); + if (Issues) { + NYql::IssuesToMessage(*Issues, request.mutable_issues()); + } + if (TransientIssues) { + NYql::IssuesToMessage(*TransientIssues, request.mutable_transient_issues()); + } + if (Statistics) { + request.set_statistics(*Statistics); + } + if (ResultSetMetas) { + for (const auto& meta : *ResultSetMetas) { + *request.add_result_set_meta() = meta; + } + } + for (const auto& dqGraph : DqGraphs) { + request.add_dq_graph(dqGraph); + } + request.set_dq_graph_index(DqGraphIndex); + if (Ast) { + request.set_ast(*Ast); + } + if (Plan) { + request.set_plan(*Plan); + } + request.set_resign_query(ResignQuery); + for (const auto& consumer : CreatedTopicConsumers) { + auto& cons = *request.add_created_topic_consumers(); + cons.set_database_id(consumer.DatabaseId); + cons.set_database(consumer.Database); + cons.set_topic_path(consumer.TopicPath); + cons.set_consumer_name(consumer.ConsumerName); + cons.set_cluster_endpoint(consumer.ClusterEndpoint); + cons.set_use_ssl(consumer.UseSsl); + cons.set_token_name(consumer.TokenName); + cons.set_add_bearer_to_token(consumer.AddBearerToToken); + } + request.set_tenant(TenantName); + request.set_scope(Scope); + *request.mutable_deadline() = NProtoInterop::CastToProto(Deadline); + if (StartedAt) { + *request.mutable_started_at() = NProtoInterop::CastToProto(*StartedAt); + } + if (FinishedAt) { + *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt); + } + + return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request)); + } +}; + +class TNodesHealthCheckBuilder { + TString TenantName; + ui32 NodeId = 0; + TString HostName; + TString InstanceId; + ui64 ActiveWorkers = 0; + ui64 MemoryLimit = 0; + ui64 MemoryAllocated = 0; + +public: + TNodesHealthCheckBuilder() + {} + + TNodesHealthCheckBuilder& SetTenantName(const TString& tenantName) + { + TenantName = tenantName; + return *this; + } + + TNodesHealthCheckBuilder& SetNodeId(const ui32& nodeId) + { + NodeId = nodeId; + return *this; + } + + TNodesHealthCheckBuilder& SetHostName(const TString& hostName) + { + HostName = hostName; + return *this; + } + + TNodesHealthCheckBuilder& SetInstanceId(const TString& instanceId) + { + InstanceId = instanceId; + return *this; + } + + TNodesHealthCheckBuilder& SetActiveWorkers(const ui64& activeWorkers) + { + ActiveWorkers = activeWorkers; + return *this; + } + + TNodesHealthCheckBuilder& SetMemoryLimit(const ui64& memoryLimit) + { + MemoryLimit = memoryLimit; + return *this; + } + + TNodesHealthCheckBuilder& SetMemoryAllocated(const ui64& memoryAllocated) + { + MemoryAllocated = memoryAllocated; + return *this; + } + + std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build() + { + Yq::Private::NodesHealthCheckRequest request; + request.set_tenant(TenantName); + auto& node = *request.mutable_node(); + node.set_node_id(NodeId); + node.set_instance_id(InstanceId); + node.set_hostname(HostName); + node.set_active_workers(ActiveWorkers); + node.set_memory_limit(MemoryLimit); + node.set_memory_allocated(MemoryAllocated); + return std::make_unique<TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request)); + } +}; + +} diff --git a/ydb/core/yq/libs/private_client/CMakeLists.txt b/ydb/core/yq/libs/private_client/CMakeLists.txt index 0f5f23c5b4d..64e19f535e1 100644 --- a/ydb/core/yq/libs/private_client/CMakeLists.txt +++ b/ydb/core/yq/libs/private_client/CMakeLists.txt @@ -25,5 +25,6 @@ target_link_libraries(yq-libs-private_client PUBLIC target_sources(yq-libs-private_client PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/internal_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/loopback_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client_fq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client.cpp ) diff --git a/ydb/core/yq/libs/private_client/private_client_fq.cpp b/ydb/core/yq/libs/private_client/private_client_fq.cpp new file mode 100644 index 00000000000..f3d6992460e --- /dev/null +++ b/ydb/core/yq/libs/private_client/private_client_fq.cpp @@ -0,0 +1,182 @@ +#include "private_client_fq.h" +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/library/yql/public/issue/yql_issue_message.h> +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> + +namespace NFq { + +using namespace NYdb; + +class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> { +public: + TImpl( + std::shared_ptr<TGRpcConnectionsImpl>&& connections, + const TCommonClientSettings& settings, + const NMonitoring::TDynamicCounterPtr& counters) + : TClientImplCommon(std::move(connections), settings) + , Counters(counters->GetSubgroup("subsystem", "private_api")->GetSubgroup("subcomponent", "ClientMetrics")) + , PingTaskTime(Counters->GetHistogram("PingTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50))) + , GetTaskTime(Counters->GetHistogram("GetTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50))) + , WriteTaskResultTime(Counters->GetHistogram("WriteTaskResultMs", NMonitoring::ExponentialHistogram(10, 2, 50))) + , NodesHealthCheckTime(Counters->GetHistogram("NodesHealthCheckMs", NMonitoring::ExponentialHistogram(10, 2, 50))) + {} + + template<class TProtoResult, class TResultWrapper> + auto MakeResultExtractor(NThreading::TPromise<TResultWrapper> promise, const NMonitoring::THistogramPtr& hist, TInstant startedAt) { + return [=, promise = std::move(promise)] + (google::protobuf::Any* any, TPlainStatus status) mutable { + std::unique_ptr<TProtoResult> result; + if (any) { + result.reset(new TProtoResult); + any->UnpackTo(result.get()); + } + + hist->Collect((TInstant::Now() - startedAt).MilliSeconds()); + + promise.SetValue( + TResultWrapper( + TStatus(std::move(status)), + std::move(result))); + }; + } + + TAsyncPingTaskResult PingTask( + Fq::Private::PingTaskRequest&& request, + const TPingTaskSettings& settings) { + const auto startedAt = TInstant::Now(); + auto promise = NThreading::NewPromise<TPingTaskResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + Fq::Private::PingTaskResult, + TPingTaskResult>(std::move(promise), PingTaskTime, startedAt); + + Connections_->RunDeferred< + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::PingTaskRequest, + Fq::Private::PingTaskResponse>( + std::move(request), + std::move(extractor), + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncPingTask, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncGetTaskResult GetTask( + Fq::Private::GetTaskRequest&& request, + const TGetTaskSettings& settings) { + const auto startedAt = TInstant::Now(); + auto promise = NThreading::NewPromise<TGetTaskResult>(); + auto future = promise.GetFuture(); + auto extractor = MakeResultExtractor< + Fq::Private::GetTaskResult, + TGetTaskResult>(std::move(promise), GetTaskTime, startedAt); + + Connections_->RunDeferred< + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::GetTaskRequest, + Fq::Private::GetTaskResponse>( + std::move(request), + std::move(extractor), + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncGetTask, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncWriteTaskResult WriteTaskResult( + Fq::Private::WriteTaskResultRequest&& request, + const TWriteTaskResultSettings& settings) { + const auto startedAt = TInstant::Now(); + auto promise = NThreading::NewPromise<TWriteTaskResult>(); + auto future = promise.GetFuture(); + auto extractor = MakeResultExtractor< + Fq::Private::WriteTaskResultResult, + TWriteTaskResult>(std::move(promise), WriteTaskResultTime, startedAt); + + Connections_->RunDeferred< + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::WriteTaskResultRequest, + Fq::Private::WriteTaskResultResponse>( + std::move(request), + std::move(extractor), + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncWriteTaskResult, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncNodesHealthCheckResult NodesHealthCheck( + Fq::Private::NodesHealthCheckRequest&& request, + const TNodesHealthCheckSettings& settings) { + const auto startedAt = TInstant::Now(); + auto promise = NThreading::NewPromise<TNodesHealthCheckResult>(); + auto future = promise.GetFuture(); + auto extractor = MakeResultExtractor< + Fq::Private::NodesHealthCheckResult, + TNodesHealthCheckResult>(std::move(promise), NodesHealthCheckTime, startedAt); + + Connections_->RunDeferred< + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::NodesHealthCheckRequest, + Fq::Private::NodesHealthCheckResponse>( + std::move(request), + std::move(extractor), + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncNodesHealthCheck, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } +private: + const NMonitoring::TDynamicCounterPtr Counters; + const NMonitoring::THistogramPtr PingTaskTime; + const NMonitoring::THistogramPtr GetTaskTime; + const NMonitoring::THistogramPtr WriteTaskResultTime; + const NMonitoring::THistogramPtr NodesHealthCheckTime; +}; + +TPrivateClient::TPrivateClient( + const TDriver& driver, + const TCommonClientSettings& settings, + const NMonitoring::TDynamicCounterPtr& counters) + : Impl(new TImpl(CreateInternalInterface(driver), settings, counters)) +{} + +TAsyncPingTaskResult TPrivateClient::PingTask( + Fq::Private::PingTaskRequest&& request, + const TPingTaskSettings& settings) { + return Impl->PingTask(std::move(request), settings); +} + +TAsyncGetTaskResult TPrivateClient::GetTask( + Fq::Private::GetTaskRequest&& request, + const TGetTaskSettings& settings) { + return Impl->GetTask(std::move(request), settings); +} + +TAsyncWriteTaskResult TPrivateClient::WriteTaskResult( + Fq::Private::WriteTaskResultRequest&& request, + const TWriteTaskResultSettings& settings) { + return Impl->WriteTaskResult(std::move(request), settings); +} + +TAsyncNodesHealthCheckResult TPrivateClient::NodesHealthCheck( + Fq::Private::NodesHealthCheckRequest&& request, + const TNodesHealthCheckSettings& settings) { + return Impl->NodesHealthCheck(std::move(request), settings); +} + +} //NFq diff --git a/ydb/core/yq/libs/private_client/private_client_fq.h b/ydb/core/yq/libs/private_client/private_client_fq.h new file mode 100644 index 00000000000..2333821cf35 --- /dev/null +++ b/ydb/core/yq/libs/private_client/private_client_fq.h @@ -0,0 +1,82 @@ +#pragma once + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> +#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> + +namespace NFq { + +template<class TProtoResult> +class TProtoResultInternalWrapper : public NYdb::TStatus { + friend class TPrivateClient; + +public: + TProtoResultInternalWrapper( + NYdb::TStatus&& status, + std::unique_ptr<TProtoResult> result) + : TStatus(std::move(status)) + , Result(std::move(result)) + { } + +public: + const TProtoResult& GetResult() const { + Y_VERIFY(Result, "Uninitialized result"); + return *Result; + } + + bool IsResultSet() const { + return Result ? true : false; + } + +private: + std::unique_ptr<TProtoResult> Result; +}; + + +using TGetTaskResult = TProtoResultInternalWrapper<Fq::Private::GetTaskResult>; +using TPingTaskResult = TProtoResultInternalWrapper<Fq::Private::PingTaskResult>; +using TWriteTaskResult = TProtoResultInternalWrapper<Fq::Private::WriteTaskResultResult>; +using TNodesHealthCheckResult = TProtoResultInternalWrapper<Fq::Private::NodesHealthCheckResult>; + +using TAsyncGetTaskResult = NThreading::TFuture<TGetTaskResult>; +using TAsyncPingTaskResult = NThreading::TFuture<TPingTaskResult>; +using TAsyncWriteTaskResult = NThreading::TFuture<TWriteTaskResult>; +using TAsyncNodesHealthCheckResult = NThreading::TFuture<TNodesHealthCheckResult>; + +struct TGetTaskSettings : public NYdb::TOperationRequestSettings<TGetTaskSettings> {}; +struct TPingTaskSettings : public NYdb::TOperationRequestSettings<TPingTaskSettings> {}; +struct TWriteTaskResultSettings : public NYdb::TOperationRequestSettings<TWriteTaskResultSettings> {}; +struct TNodesHealthCheckSettings : public NYdb::TOperationRequestSettings<TNodesHealthCheckSettings> {}; + +class TPrivateClient { + class TImpl; + +public: + TPrivateClient( + const NYdb::TDriver& driver, + const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings(), + const NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); + + TAsyncGetTaskResult GetTask( + Fq::Private::GetTaskRequest&& request, + const TGetTaskSettings& settings = TGetTaskSettings()); + + TAsyncPingTaskResult PingTask( + Fq::Private::PingTaskRequest&& request, + const TPingTaskSettings& settings = TPingTaskSettings()); + + TAsyncWriteTaskResult WriteTaskResult( + Fq::Private::WriteTaskResultRequest&& request, + const TWriteTaskResultSettings& settings = TWriteTaskResultSettings()); + + TAsyncNodesHealthCheckResult NodesHealthCheck( + Fq::Private::NodesHealthCheckRequest&& request, + const TNodesHealthCheckSettings& settings = TNodesHealthCheckSettings()); + +private: + std::shared_ptr<TImpl> Impl; +}; + +} // namespace NFq diff --git a/ydb/core/yq/libs/protos/CMakeLists.txt b/ydb/core/yq/libs/protos/CMakeLists.txt index 8761fc37f06..2d6538dedff 100644 --- a/ydb/core/yq/libs/protos/CMakeLists.txt +++ b/ydb/core/yq/libs/protos/CMakeLists.txt @@ -16,6 +16,7 @@ target_link_libraries(yq-libs-protos PUBLIC contrib-libs-protobuf ) target_proto_messages(yq-libs-protos PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/fq_private.proto ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/yq_private.proto ) target_proto_addincls(yq-libs-protos diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto new file mode 100644 index 00000000000..95f486ae834 --- /dev/null +++ b/ydb/core/yq/libs/protos/fq_private.proto @@ -0,0 +1,168 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package Fq.Private; + +import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; + +import "ydb/public/api/protos/ydb_operation.proto"; +import "ydb/public/api/protos/ydb_value.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/annotations/validation.proto"; + +import "ydb/public/api/protos/fq.proto"; + +import "google/protobuf/timestamp.proto"; + +//////////////////////////////////////////////////////////// + +message GetTaskRequest { + string tenant = 1; + string owner_id = 2; // guid, should be refreshed on node restart + string host = 3; + Ydb.Operations.OperationParams operation_params = 4; + + message Filter { + string query_executor = 1 [(Ydb.length).le = 100]; + } + Filter filter = 5; +} + +message SignedIdentity { + string value = 1; + string signature = 2; +} + +message TopicConsumer { + string database_id = 1; + string database = 2; + string topic_path = 3; + string consumer_name = 4; + string cluster_endpoint = 5; + bool use_ssl = 6; + string token_name = 7; + bool add_bearer_to_token = 8; +} + +message GetTaskResult { + message Task { + // come back later in 10 sec ? + SignedIdentity result_id = 1; + SignedIdentity query_id = 2; + SignedIdentity job_id = 3; + uint64 generation = 4; + + bool streaming = 5; + repeated bytes dq_graph = 6; + // text, connection and binding are empty if dq_graph is not empty + string text = 7; + repeated FederatedQuery.Connection connection = 8; + repeated FederatedQuery.Binding binding = 9; + + string user_token = 10; // IAM token for debug + repeated SignedIdentity service_accounts = 11; + string user_id = 12; + FederatedQuery.QueryContent.QueryType query_type = 13; + string scope = 14; + FederatedQuery.ExecuteMode execute_mode = 15; + FederatedQuery.StateLoadMode state_load_mode = 16; + FederatedQuery.QueryMeta.ComputeStatus status = 17; + repeated FederatedQuery.ResultSetMeta result_set_meta = 18; + repeated TopicConsumer created_topic_consumers = 19; + int32 dq_graph_index = 20; + map<string, string> sensor_labels = 21; + + bool automatic = 22; + string query_name = 23; + google.protobuf.Timestamp deadline = 24; + FederatedQuery.StreamingDisposition disposition = 25; + uint64 result_limit = 26; + } + repeated Task tasks = 1; +} + +message GetTaskResponse { + Ydb.Operations.Operation operation = 1; // GetTaskResult +} + +message PingTaskRequest { + string owner_id = 1; + SignedIdentity query_id = 2; + SignedIdentity job_id = 3; + SignedIdentity result_id = 4; + FederatedQuery.QueryMeta.ComputeStatus status = 5; + NYql.NDqProto.StatusIds.StatusCode status_code = 21; + repeated Ydb.Issue.IssueMessage issues = 6; + repeated Ydb.Issue.IssueMessage transient_issues = 16; + uint32 result_set_count = 7; + string statistics = 8; + repeated FederatedQuery.ResultSetMeta result_set_meta = 9; + string executer_info = 10; + repeated bytes dq_graph = 11; + int32 dq_graph_index = 20; + string ast = 12; + string plan = 13; + bool resign_query = 14; + repeated TopicConsumer created_topic_consumers = 17; + FederatedQuery.StateLoadMode state_load_mode = 18; + FederatedQuery.StreamingDisposition disposition = 19; + Ydb.Operations.OperationParams operation_params = 15; + string scope = 100; + string tenant = 104; + google.protobuf.Timestamp started_at = 101; + google.protobuf.Timestamp finished_at = 102; + google.protobuf.Timestamp deadline = 103; +} + +message PingTaskResult { + FederatedQuery.QueryAction action = 1; +} + +message PingTaskResponse { + Ydb.Operations.Operation operation = 1; // PingTaskResult +} + +message WriteTaskResultRequest { + string owner_id = 1; + SignedIdentity result_id = 2; + Ydb.ResultSet result_set = 3; + uint32 result_set_id = 4; + uint64 offset = 5; + uint64 request_id = 6; + Ydb.Operations.OperationParams operation_params = 7; + google.protobuf.Timestamp deadline = 8; +} + +message WriteTaskResultResult { + uint64 request_id = 1; +} + +message WriteTaskResultResponse { + Ydb.Operations.Operation operation = 1; // WriteRowsResultResult +} + +message NodeInfo { + uint32 node_id = 1; + string instance_id = 2; + string hostname = 3; + uint64 active_workers = 4; + uint64 memory_limit = 5; + uint64 memory_allocated = 6; + uint32 interconnect_port = 7; + string node_address = 8; + string data_center = 9; +} + +message NodesHealthCheckRequest { + string tenant = 1; + NodeInfo node = 2; + Ydb.Operations.OperationParams operation_params = 6; +} + +message NodesHealthCheckResult { + repeated NodeInfo nodes = 1; +} + +message NodesHealthCheckResponse { + Ydb.Operations.Operation operation = 1; // NodesHealthCheckResult +} diff --git a/ydb/public/api/grpc/CMakeLists.txt b/ydb/public/api/grpc/CMakeLists.txt index f3f8fd2ca09..df5495b4370 100644 --- a/ydb/public/api/grpc/CMakeLists.txt +++ b/ydb/public/api/grpc/CMakeLists.txt @@ -19,14 +19,15 @@ target_link_libraries(api-grpc PUBLIC contrib-libs-protobuf ) target_proto_messages(api-grpc PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/fq_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_auth_v1.proto + ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_cms_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_coordination_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_discovery_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_export_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_import_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_monitoring_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_operation_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_cms_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_rate_limiter_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_scheme_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_scripting_v1.proto diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt index 2093014f616..f048ca35d81 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.txt @@ -29,6 +29,7 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto + ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1_fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1.proto ) target_proto_addincls(api-grpc-draft diff --git a/ydb/public/api/grpc/draft/yql_db_v1_fq.proto b/ydb/public/api/grpc/draft/yql_db_v1_fq.proto new file mode 100644 index 00000000000..cc403e93a43 --- /dev/null +++ b/ydb/public/api/grpc/draft/yql_db_v1_fq.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package Fq.Private.V1; + +import "ydb/core/yq/libs/protos/fq_private.proto"; + +service FqPrivateTaskService { + // gets new task + rpc GetTask(Fq.Private.GetTaskRequest) returns (Fq.Private.GetTaskResponse); + + // pings new task (also can update metadata) + rpc PingTask(Fq.Private.PingTaskRequest) returns (Fq.Private.PingTaskResponse); + + // writes rows + rpc WriteTaskResult(Fq.Private.WriteTaskResultRequest) returns (Fq.Private.WriteTaskResultResponse); + + //Nodes + rpc NodesHealthCheck(Fq.Private.NodesHealthCheckRequest) returns (Fq.Private.NodesHealthCheckResponse); +} diff --git a/ydb/public/api/grpc/fq_v1.proto b/ydb/public/api/grpc/fq_v1.proto new file mode 100644 index 00000000000..0b52f8bc037 --- /dev/null +++ b/ydb/public/api/grpc/fq_v1.proto @@ -0,0 +1,79 @@ +syntax = "proto3"; + +package FederatedQuery.V1; +option java_package = "com.federated.query.v1"; + +import "ydb/public/api/protos/fq.proto"; + +service FederatedQueryService { + // Query + // Query is the text of an SQL request, the results of the last run and the state after the last run (partitions offsets, consumer in YDS) + // Create a query object with a given SQL + rpc CreateQuery(FederatedQuery.CreateQueryRequest) returns (FederatedQuery.CreateQueryResponse); + + // Get a list of brief queries objects + rpc ListQueries(FederatedQuery.ListQueriesRequest) returns (FederatedQuery.ListQueriesResponse); + + // Get full information about the object of the query + rpc DescribeQuery(FederatedQuery.DescribeQueryRequest) returns (FederatedQuery.DescribeQueryResponse); + + // Get status of the query + rpc GetQueryStatus(FederatedQuery.GetQueryStatusRequest) returns (FederatedQuery.GetQueryStatusResponse); + + // Change the attributes of the query (acl, name, ...) + rpc ModifyQuery(FederatedQuery.ModifyQueryRequest) returns (FederatedQuery.ModifyQueryResponse); + + // Completely delete the query + rpc DeleteQuery(FederatedQuery.DeleteQueryRequest) returns (FederatedQuery.DeleteQueryResponse); + + // Change the state of the query lifecycle + rpc ControlQuery(FederatedQuery.ControlQueryRequest) returns (FederatedQuery.ControlQueryResponse); + + // Get a results page + rpc GetResultData(FederatedQuery.GetResultDataRequest) returns (FederatedQuery.GetResultDataResponse); + + // Job + // Job - appears immediately after starting the request and contains the request metadata + // Get a list of jobs + rpc ListJobs(FederatedQuery.ListJobsRequest) returns (FederatedQuery.ListJobsResponse); + + // Get information about the job + rpc DescribeJob(FederatedQuery.DescribeJobRequest) returns (FederatedQuery.DescribeJobResponse); + + // Connection + // Connection - entity that describes connection points. This can be imagined as an analogue of a network address. + // Create a connection object (ObjectStorage, YDB, YDS, ...) + rpc CreateConnection(FederatedQuery.CreateConnectionRequest) returns (FederatedQuery.CreateConnectionResponse); + + // Get a list of connections objects + rpc ListConnections(FederatedQuery.ListConnectionsRequest) returns (FederatedQuery.ListConnectionsResponse); + + // Get information about the object of the connection + rpc DescribeConnection(FederatedQuery.DescribeConnectionRequest) returns (FederatedQuery.DescribeConnectionResponse); + + // Change the attributes of the connection + rpc ModifyConnection(FederatedQuery.ModifyConnectionRequest) returns (FederatedQuery.ModifyConnectionResponse); + + // Completely delete the connection + rpc DeleteConnection(FederatedQuery.DeleteConnectionRequest) returns (FederatedQuery.DeleteConnectionResponse); + + // Test the connection (permissions, network, ...) + rpc TestConnection(FederatedQuery.TestConnectionRequest) returns (FederatedQuery.TestConnectionResponse); + + // Binding + // Binding - entity using which a schema is assigned to non-schematic data + // Create a binding object - bind schema with ObjectStorage object or YDS stream + rpc CreateBinding(FederatedQuery.CreateBindingRequest) returns (FederatedQuery.CreateBindingResponse); + + // Get a list of bindings objects + rpc ListBindings(FederatedQuery.ListBindingsRequest) returns (FederatedQuery.ListBindingsResponse); + + // Get information about the object of the binding + rpc DescribeBinding(FederatedQuery.DescribeBindingRequest) returns (FederatedQuery.DescribeBindingResponse); + + // Change the attributes of the binding + rpc ModifyBinding(FederatedQuery.ModifyBindingRequest) returns (FederatedQuery.ModifyBindingResponse); + + // Completely delete the binding + rpc DeleteBinding(FederatedQuery.DeleteBindingRequest) returns (FederatedQuery.DeleteBindingResponse); +} diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt index 797619ffbd7..98e03a01ab5 100644 --- a/ydb/public/api/protos/CMakeLists.txt +++ b/ydb/public/api/protos/CMakeLists.txt @@ -21,6 +21,7 @@ target_proto_messages(api-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto + ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/persqueue_error_codes_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_auth.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_persqueue_v1.proto diff --git a/ydb/public/api/protos/fq.proto b/ydb/public/api/protos/fq.proto new file mode 100644 index 00000000000..2e08f76543c --- /dev/null +++ b/ydb/public/api/protos/fq.proto @@ -0,0 +1,743 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +package FederatedQuery; +option java_package = "com.yandex.query"; +option java_outer_classname = "FederatedQueryProtos"; + +import "ydb/public/api/protos/annotations/sensitive.proto"; +import "ydb/public/api/protos/annotations/validation.proto"; +import "ydb/public/api/protos/ydb_operation.proto"; +import "ydb/public/api/protos/ydb_value.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; + +//////////////////////////////////////////////////////////// + +// === Query API === + +// Header: ydb-fq-project => yandexcloud://cloud_id/folder_id + +message Acl { + enum Visibility { + VISIBILITY_UNSPECIFIED = 0; + PRIVATE = 1; // Visibility only for the creator of the entity + SCOPE = 2; // Visibility for subjects within scope + } + Visibility visibility = 1; +} + +message Limits { + // Used only for streaming queries + int64 vcpu_rate_limit = 1 [(Ydb.value) = ">= 0"]; // 0.01 vcpu per second + int64 flow_rate_limit = 2 [(Ydb.value) = ">= 0"]; // Bytes per second + int64 vcpu_time_limit = 3 [(Ydb.value) = ">= 0"]; // Milliseconds per second + + // Used only for analytics queries + int64 max_result_size = 4 [(Ydb.value) = ">= 0"]; // Bytes + int64 max_result_rows = 5 [(Ydb.value) = ">= 0"]; // Count + + // Common limits + int64 memory_limit = 6 [(Ydb.value) = ">= 0"]; // Bytes + google.protobuf.Duration result_ttl = 7; +} + +enum ExecuteMode { + EXECUTE_MODE_UNSPECIFIED = 0; + SAVE = 1; // Save a query without changing its state + PARSE = 2; // Parse the query + COMPILE = 3; // Parse and compile the query + VALIDATE = 4; // Parse, compile and validate the query + EXPLAIN = 5; // High-level query plan that specifies only physical operations and non-temporary table names + RUN = 6; // Do all the previous + execution of the query +} + +enum QueryAction { + QUERY_ACTION_UNSPECIFIED = 0; + PAUSE = 1; // Pause the query, with the possibility of its quick resumption + PAUSE_GRACEFULLY = 2; // Similar to PAUSE, only suspends the query allowing it to pause in checkpoint. Can work for a long time + ABORT = 3; // Stop the query + ABORT_GRACEFULLY = 4; // Similar to ABORT, only stops the query in checkpoint + RESUME = 5; // Resumes the execution of the query. Works only for PAUSE queries +} + +enum StateLoadMode { + STATE_LOAD_MODE_UNSPECIFIED = 0; + EMPTY = 1; // Start the query with an empty state + FROM_LAST_CHECKPOINT = 2; // Start the query with the state that is saved in the last checkpoint +} + +// For streaming queries only +message StreamingDisposition { + message FromTime { + google.protobuf.Timestamp timestamp = 1; + } + + message TimeAgo { + google.protobuf.Duration duration = 1; + } + + message FromLastCheckpoint { + // By default if new query streams set doesn't equal to old query streams set, + // error will occur and query won't be allowed to load offsets for streams for the last checkpoint. + // If this flag is set all offsets that can be matched with previous query checkpoint will be matched. + // Others will use "fresh" streaming disposition. + bool force = 1; + } + + oneof disposition { + google.protobuf.Empty oldest = 1; // Start processing with the oldest offset + google.protobuf.Empty fresh = 2; // Start processing with the fresh offset + FromTime from_time = 3; // Start processing with offset from the specified time + TimeAgo time_ago = 4; // Start processing with offset some time ago + FromLastCheckpoint from_last_checkpoint = 5; // Start processing with offset which corresponds to the last checkpoint + } +} + +// Query information that the subject can change +message QueryContent { + enum QueryType { + QUERY_TYPE_UNSPECIFIED = 0; + ANALYTICS = 1; // Analytical query (used for analytical data processing for example to work with YDB, ClickHouse, ...) + STREAMING = 2; // Streaming query (used for streaming data processing, such as working with YDS) + } + QueryType type = 1; + string name = 2 [(Ydb.length).le = 1024]; + Acl acl = 3; + Limits limits = 4; + string text = 5 [(Ydb.length).le = 102400]; // The text of the query itself + bool automatic = 6; // Is used for queries that are created by automatic systems (robots, jdbc driver, ...) + string description = 7 [(Ydb.length).le = 10240]; // Description of the query, there can be any text + // Specified settings for query's executor + // Well known settings are: + // "executor" - type of executor for this query + map<string, string> execution_settings = 10 [(Ydb.map_key).length.range = {min: 1, max: 100}, (Ydb.length).le = 4096]; +} + +message CommonMeta { + string id = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + string created_by = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + string modified_by = 3 [(Ydb.length).range = {min: 1, max: 1024}]; + google.protobuf.Timestamp created_at = 4; + google.protobuf.Timestamp modified_at = 5; + int64 revision = 6 [(Ydb.value) = ">= 0"]; // Entity version, increases with each change +} + +message QueryMeta { + enum ComputeStatus { + COMPUTE_STATUS_UNSPECIFIED = 0; + STARTING = 1; // Start execution of the action on query + ABORTED_BY_USER = 2; // Query aborted by user + ABORTED_BY_SYSTEM = 3; // Query aborted by system + ABORTING_BY_USER = 4; // Query aborting by user + ABORTING_BY_SYSTEM = 5; // Query aborting by system + RESUMING = 6; // Resuming query execution from PAUSED status + RUNNING = 7; // Query started for execution + COMPLETED = 8; // Query completed successfully + COMPLETING = 12; // Finalizing query before become COMPLETED + FAILED = 9; // Query completed with errors + FAILING = 13; // Finalizing query before become FAILED + PAUSED = 11; // Query paused + PAUSING = 10; // Query starts pausing + } + + CommonMeta common = 1; + google.protobuf.Timestamp started_at = 2; + google.protobuf.Timestamp finished_at = 3; + ExecuteMode execute_mode = 4; + ComputeStatus status = 5; + int64 last_job_query_revision = 6; + string last_job_id = 7; + google.protobuf.Timestamp expire_at = 8; + google.protobuf.Timestamp result_expire_at = 9; + string started_by = 10; + oneof action { + string aborted_by = 11; + string paused_by = 12; + } + // One of the versions of this query has fully saved checkpoint. + // If this flag is not set streaming disposition mode "from last checkpoint" can't be used. + bool has_saved_checkpoints = 13; +} + +message BriefQuery { + QueryContent.QueryType type = 1; + string name = 2 [(Ydb.length).le = 1024]; + QueryMeta meta = 3; + Acl.Visibility visibility = 4; + bool automatic = 5; +} + +message QueryPlan { + string json = 1; // No validation because generated on server side +} + +message QueryAst { + string data = 1; +} + +message ResultSetMeta { + repeated Ydb.Column column = 1; + int64 rows_count = 2 [(Ydb.value) = ">= 0"]; + bool truncated = 3; +} + +message Query { + QueryMeta meta = 1; + QueryContent content = 2; + QueryPlan plan = 3; + repeated Ydb.Issue.IssueMessage issue = 4; + repeated Ydb.Issue.IssueMessage transient_issue = 5; + QueryStatistics statistics = 6; + repeated ResultSetMeta result_set_meta = 7; + QueryAst ast = 8; +} + +message QueryStatistics { + string json = 1; // No validation because generated on server side +} + +// Create a new query +message CreateQueryRequest { + Ydb.Operations.OperationParams operation_params = 1; + QueryContent content = 2; + ExecuteMode execute_mode = 3; + StreamingDisposition disposition = 4; + string idempotency_key = 5 [(Ydb.length).le = 1024]; +} + +message CreateQueryResponse { + Ydb.Operations.Operation operation = 1; // CreateQueryResult +} + +message CreateQueryResult { + string query_id = 1 [(Ydb.length).le = 1024]; +} + +enum AutomaticType { + AUTOMATIC_TYPE_UNSPECIFIED = 0; + AUTOMATIC = 1; + NOT_AUTOMATIC = 2; +} + +// Getting brief information about queries +message ListQueriesRequest { + Ydb.Operations.OperationParams operation_params = 1; + string page_token = 2 [(Ydb.length).le = 1024]; + int32 limit = 3 [(Ydb.value) = "[1; 100]"]; + + message Filter { + QueryContent.QueryType query_type = 1; + repeated QueryMeta.ComputeStatus status = 2 [(Ydb.size).le = 20]; + repeated ExecuteMode mode = 3 [(Ydb.size).le = 20]; + string name = 4 [(Ydb.length).le = 1024]; // queries whose name contains the filter.name substring + bool created_by_me = 5; + Acl.Visibility visibility = 6; + AutomaticType automatic = 7; + } + Filter filter = 4; +} + +message ListQueriesResponse { + Ydb.Operations.Operation operation = 1; // ListQueriesResult +} + +message ListQueriesResult { + repeated BriefQuery query = 1; + string next_page_token = 2 [(Ydb.length).le = 1024]; +} + +// Getting complete information about the query +message DescribeQueryRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +message DescribeQueryResponse { + Ydb.Operations.Operation operation = 1; // DescribeQueryResult +} + +message DescribeQueryResult { + Query query = 1; +} + +// Getting status of the query +message GetQueryStatusRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +message GetQueryStatusResponse { + Ydb.Operations.Operation operation = 1; // GetQueryStatusResult +} + +message GetQueryStatusResult { + QueryMeta.ComputeStatus status = 1; + int64 meta_revision = 2; +} + +// Complete removal of query. Recovery of the query after this operation is not possible +message DeleteQueryRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + int64 previous_revision = 3 [(Ydb.value) = ">= 0"]; + string idempotency_key = 4 [(Ydb.length).le = 1024]; +} + +message DeleteQueryResponse { + Ydb.Operations.Operation operation = 1; // DeleteQueryResult +} + +message DeleteQueryResult { +} + +// Change query information with launch policy option. All fields must be filled in the request. The query changes completely +message ModifyQueryRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + QueryContent content = 3; + ExecuteMode execute_mode = 4; + StreamingDisposition disposition = 5; + StateLoadMode state_load_mode = 6; + int64 previous_revision = 7 [(Ydb.value) = ">= 0"]; + string idempotency_key = 8 [(Ydb.length).le = 1024]; +} + +message ModifyQueryResponse { + Ydb.Operations.Operation operation = 1; // ModifyQueryResult +} + +message ModifyQueryResult { +} + +// Managing query status (pause, abort, resume, ...) +message ControlQueryRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + QueryAction action = 3; + int64 previous_revision = 4 [(Ydb.value) = ">= 0"]; + string idempotency_key = 5 [(Ydb.length).le = 1024]; +} + +message ControlQueryResponse { + Ydb.Operations.Operation operation = 1; // ControlQueryResult +} + +message ControlQueryResult { +} + +// === Job API === + +message BriefJob { + CommonMeta meta = 1; + QueryMeta query_meta = 3; + string query_name = 9; + Acl.Visibility visibility = 10; + bool automatic = 11; + google.protobuf.Timestamp expire_at = 12; +} + +message Job { + CommonMeta meta = 1; + string text = 2; + QueryMeta query_meta = 3; + QueryPlan plan = 4; + repeated Ydb.Issue.IssueMessage issue = 5; + QueryStatistics statistics = 6; + repeated ResultSetMeta result_set_meta = 7; + QueryAst ast = 8; + string query_name = 9; + Acl acl = 10; + bool automatic = 11; + google.protobuf.Timestamp expire_at = 12; +} + +// Information about recent query runs +message ListJobsRequest { + Ydb.Operations.OperationParams operation_params = 1; + string page_token = 2 [(Ydb.length).le = 1024]; + int32 limit = 3 [(Ydb.value) = "[1; 100]"]; + string query_id = 5; // deprecated + + message Filter { + string query_id = 1 [(Ydb.length).le = 1024]; + bool created_by_me = 2; + } + Filter filter = 4; +} + +message ListJobsResponse { + Ydb.Operations.Operation operation = 1; // ListJobsResult +} + +message ListJobsResult { + repeated BriefJob job = 1; + string next_page_token = 2 [(Ydb.length).le = 1024]; +} + +// Getting information about the job +message DescribeJobRequest { + Ydb.Operations.OperationParams operation_params = 1; + string job_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +message DescribeJobResponse { + Ydb.Operations.Operation operation = 1; // DescribeJobResult +} + +message DescribeJobResult { + Job job = 1; +} + +// === Connection API === + +message CurrentIAMTokenAuth { +} + +message NoneAuth { +} + +message ServiceAccountAuth { + string id = 1 [(Ydb.length).le = 1024]; +} + +message IamAuth { + oneof identity { + CurrentIAMTokenAuth current_iam = 1; + ServiceAccountAuth service_account = 2; + NoneAuth none = 3; + } +} + +message DataStreams { + string database_id = 1 [(Ydb.length).le = 1024]; + IamAuth auth = 2; + + // for internal usage + string endpoint = 3 [(Ydb.length).le = 1024]; + string database = 4 [(Ydb.length).le = 1024]; + bool secure = 5; +} + +message Monitoring { + string project = 1 [(Ydb.length).le = 200]; + string cluster = 2 [(Ydb.length).le = 200]; + IamAuth auth = 3; +} + +message YdbDatabase { + string database_id = 1 [(Ydb.length).le = 1024]; + IamAuth auth = 2; + + // for internal usage + string endpoint = 3 [(Ydb.length).le = 1024]; + string database = 4 [(Ydb.length).le = 1024]; + bool secure = 5; +} + +message ClickHouseCluster { + string database_id = 1 [(Ydb.length).le = 1024]; + string login = 2 [(Ydb.length).le = 1024, (Ydb.sensitive) = true]; + string password = 3 [(Ydb.length).le = 1024, (Ydb.sensitive) = true]; + IamAuth auth = 4; + + // for internal usage + string host = 5 [(Ydb.length).le = 1024]; + int32 port = 6 [(Ydb.value) = "[0; 65536]"]; + bool secure = 7; +} + +message ObjectStorageConnection { + string bucket = 1 [(Ydb.length).le = 1024]; + IamAuth auth = 2; +} + +message ConnectionSetting { + enum ConnectionType { + CONNECTION_TYPE_UNSPECIFIED = 0; + YDB_DATABASE = 1; + CLICKHOUSE_CLUSTER = 2; + DATA_STREAMS = 3; + OBJECT_STORAGE = 4; + MONITORING = 5; + } + + oneof connection { + YdbDatabase ydb_database = 1; + ClickHouseCluster clickhouse_cluster = 2; + DataStreams data_streams = 3; + ObjectStorageConnection object_storage = 4; + Monitoring monitoring = 5; + } +} + +message ConnectionContent { + string name = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + ConnectionSetting setting = 2; + Acl acl = 3; + string description = 4 [(Ydb.length).le = 10240]; +} + +message Connection { + ConnectionContent content = 1; + CommonMeta meta = 2; +} + +// Create a new connection +message CreateConnectionRequest { + Ydb.Operations.OperationParams operation_params = 1; + ConnectionContent content = 2; + string idempotency_key = 3 [(Ydb.length).le = 1024]; +} + +message CreateConnectionResponse { + Ydb.Operations.Operation operation = 1; // CreateConnectionResult +} + +message CreateConnectionResult { + string connection_id = 1 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +// Getting information about connections +message ListConnectionsRequest { + Ydb.Operations.OperationParams operation_params = 1; + string page_token = 2 [(Ydb.length).le = 1024]; + int32 limit = 3 [(Ydb.value) = "[1; 100]"]; + + message Filter { + string name = 1 [(Ydb.length).le = 1024]; // connections whose name contains the filter.name substring + bool created_by_me = 2; + ConnectionSetting.ConnectionType connection_type = 3; + } + Filter filter = 4; +} + +message ListConnectionsResponse { + Ydb.Operations.Operation operation = 1; // ListConnectionsResult +} + +message ListConnectionsResult { + repeated Connection connection = 1; + string next_page_token = 2 [(Ydb.length).le = 1024]; +} + +// Getting information about the connection +message DescribeConnectionRequest { + Ydb.Operations.OperationParams operation_params = 1; + string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +message DescribeConnectionResponse { + Ydb.Operations.Operation operation = 1; // DescribeConnectionResult +} + +message DescribeConnectionResult { + Connection connection = 1; +} + +// Change connection information. All fields must be filled in the request. The connection changes completely +message ModifyConnectionRequest { + Ydb.Operations.OperationParams operation_params = 1; + string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + ConnectionContent content = 3; + int64 previous_revision = 4 [(Ydb.value) = ">= 0"]; + string idempotency_key = 5 [(Ydb.length).le = 1024]; +} + +message ModifyConnectionResponse { + Ydb.Operations.Operation operation = 1; // ModifyConnectionResult +} + +message ModifyConnectionResult { +} + +// Complete removal of connection. Recovery of the connection after this operation is not possible +message DeleteConnectionRequest { + Ydb.Operations.OperationParams operation_params = 1; + string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + int64 previous_revision = 3 [(Ydb.value) = ">= 0"]; + string idempotency_key = 4 [(Ydb.length).le = 1024]; +} + +message DeleteConnectionResponse { + Ydb.Operations.Operation operation = 1; // DeleteConnectionResult +} + +message DeleteConnectionResult { +} + +message TestConnectionRequest { + Ydb.Operations.OperationParams operation_params = 1; + ConnectionSetting setting = 2; +} + +message TestConnectionResponse { + Ydb.Operations.Operation operation = 1; // TestConnectionResult +} + +message TestConnectionResult { +} + +// ResultSet API + +// Getting the result of the query execution +message GetResultDataRequest { + Ydb.Operations.OperationParams operation_params = 1; + string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + int32 result_set_index = 3 [(Ydb.value) = ">= 0"]; + int64 offset = 4 [(Ydb.value) = ">= 0"]; + int64 limit = 5 [(Ydb.value) = "[1; 1000]"]; +} + +message GetResultDataResponse { + Ydb.Operations.Operation operation = 1; // GetResultDataResult +} + +message GetResultDataResult { + Ydb.ResultSet result_set = 1; +} + +// Binding API + +message Schema { + repeated Ydb.Column column = 1 [(Ydb.size).le = 100]; +} + +message DataStreamsBinding { + string stream_name = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + string format = 2 [(Ydb.length).le = 1024]; + string compression = 3 [(Ydb.length).le = 1024]; + Schema schema = 4; + map<string, string> format_setting = 5 [(Ydb.size).le = 100]; +} + +message ObjectStorageBinding { + message Subset { + string path_pattern = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + string format = 2 [(Ydb.length).le = 1024]; + map<string, string> format_setting = 3 [(Ydb.size).le = 100]; + string compression = 4 [(Ydb.length).le = 1024]; + Schema schema = 5; + } + + repeated Subset subset = 1; +} + +message BindingSetting { + enum BindingType { + BINDING_TYPE_UNSPECIFIED = 0; + DATA_STREAMS = 1; + OBJECT_STORAGE = 2; + } + + oneof binding { + DataStreamsBinding data_streams = 1; + ObjectStorageBinding object_storage = 2; + } +} + +message BriefBinding { + string name = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + CommonMeta meta = 3; + BindingSetting.BindingType type = 4; +} + +message BindingContent { + string name = 1 [(Ydb.length).range = {min: 1, max: 1024}]; + string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + BindingSetting setting = 3; + Acl acl = 4; + string description = 5 [(Ydb.length).le = 10240]; +} + +message Binding { + BindingContent content = 1; + CommonMeta meta = 2; +} + +// Create a new binding +message CreateBindingRequest { + Ydb.Operations.OperationParams operation_params = 1; + BindingContent content = 2; + string idempotency_key = 3 [(Ydb.length).le = 1024]; +} + +message CreateBindingResponse { + Ydb.Operations.Operation operation = 1; // CreateBindingResult +} + +message CreateBindingResult { + string binding_id = 1 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +// Getting information about bindings +message ListBindingsRequest { + Ydb.Operations.OperationParams operation_params = 1; + string page_token = 2 [(Ydb.length).le = 1024]; + int32 limit = 3 [(Ydb.value) = "[1; 100]"]; + + message Filter { + string connection_id = 1 [(Ydb.length).le = 1024]; + string name = 2 [(Ydb.length).le = 1024]; // bindings whose name contains the filter.name substring + bool created_by_me = 3; + } + Filter filter = 4; +} + +message ListBindingsResponse { + Ydb.Operations.Operation operation = 1; // ListBindingsResult +} + +message ListBindingsResult { + repeated BriefBinding binding = 1; + string next_page_token = 2 [(Ydb.length).le = 1024]; +} + +// Getting information about the binding +message DescribeBindingRequest { + Ydb.Operations.OperationParams operation_params = 1; + string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; +} + +message DescribeBindingResponse { + Ydb.Operations.Operation operation = 1; // DescribeBindingResult +} + +message DescribeBindingResult { + Binding binding = 1; +} + +// Change binding information. All fields must be filled in the request. The binding changes completely +message ModifyBindingRequest { + Ydb.Operations.OperationParams operation_params = 1; + string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + BindingContent content = 3; + int64 previous_revision = 4 [(Ydb.value) = ">= 0"]; + string idempotency_key = 5 [(Ydb.length).le = 1024]; +} + +message ModifyBindingResponse { + Ydb.Operations.Operation operation = 1; // ModifyBindingResult +} + +message ModifyBindingResult { +} + +// Complete removal of binding. Recovery of the binding after this operation is not possible +message DeleteBindingRequest { + Ydb.Operations.OperationParams operation_params = 1; + string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}]; + int64 previous_revision = 3 [(Ydb.value) = ">= 0"]; + string idempotency_key = 4 [(Ydb.length).le = 1024]; +} + +message DeleteBindingResponse { + Ydb.Operations.Operation operation = 1; // DeleteBindingResult +} + +message DeleteBindingResult { +} diff --git a/ydb/public/api/protos/yq.proto b/ydb/public/api/protos/yq.proto index 9ecf9026959..82f83acddc9 100644 --- a/ydb/public/api/protos/yq.proto +++ b/ydb/public/api/protos/yq.proto @@ -1,3 +1,5 @@ +// DEPRECATED!! use fq.proto + syntax = "proto3"; option cc_enable_arenas = true; diff --git a/ydb/public/lib/fq/CMakeLists.txt b/ydb/public/lib/fq/CMakeLists.txt new file mode 100644 index 00000000000..06d407b7d9a --- /dev/null +++ b/ydb/public/lib/fq/CMakeLists.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(public-lib-fq) +target_link_libraries(public-lib-fq PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-json + api-grpc-draft + cpp-client-ydb_table +) +target_sources(public-lib-fq PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/public/lib/fq/fq.cpp + ${CMAKE_SOURCE_DIR}/ydb/public/lib/fq/scope.cpp +) diff --git a/ydb/public/lib/fq/fq.cpp b/ydb/public/lib/fq/fq.cpp new file mode 100644 index 00000000000..59c4d5cb22c --- /dev/null +++ b/ydb/public/lib/fq/fq.cpp @@ -0,0 +1,754 @@ +#include "fq.h" + +#define INCLUDE_YDB_INTERNAL_H +#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> +#undef INCLUDE_YDB_INTERNAL_H + +#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> + +namespace NYdb::NFq { + +using namespace NYdb; + +class TClient::TImpl : public TClientImplCommon<TClient::TImpl> { +public: + TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) {} + + template<class TProtoResult, class TResultWrapper> + auto MakeResultExtractor(NThreading::TPromise<TResultWrapper> promise) { + return [promise = std::move(promise)] + (google::protobuf::Any* any, TPlainStatus status) mutable { + std::unique_ptr<TProtoResult> result; + if (any) { + result.reset(new TProtoResult); + any->UnpackTo(result.get()); + } + + promise.SetValue( + TResultWrapper( + TStatus(std::move(status)), + std::move(result))); + }; + } + + TAsyncCreateQueryResult CreateQuery( + const FederatedQuery::CreateQueryRequest& protoRequest, + const TCreateQuerySettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::CreateQueryRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TCreateQueryResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::CreateQueryResult, + TCreateQueryResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::CreateQueryRequest, + FederatedQuery::CreateQueryResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncListQueriesResult ListQueries( + const FederatedQuery::ListQueriesRequest& protoRequest, + const TListQueriesSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ListQueriesRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TListQueriesResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ListQueriesResult, + TListQueriesResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ListQueriesRequest, + FederatedQuery::ListQueriesResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListQueries, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDescribeQueryResult DescribeQuery( + const FederatedQuery::DescribeQueryRequest& protoRequest, + const TDescribeQuerySettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DescribeQueryRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDescribeQueryResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DescribeQueryResult, + TDescribeQueryResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DescribeQueryRequest, + FederatedQuery::DescribeQueryResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncGetQueryStatusResult GetQueryStatus( + const FederatedQuery::GetQueryStatusRequest& protoRequest, + const TGetQueryStatusSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::GetQueryStatusRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TGetQueryStatusResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::GetQueryStatusResult, + TGetQueryStatusResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::GetQueryStatusRequest, + FederatedQuery::GetQueryStatusResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncGetQueryStatus, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncModifyQueryResult ModifyQuery( + const FederatedQuery::ModifyQueryRequest& protoRequest, + const TModifyQuerySettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ModifyQueryRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TModifyQueryResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ModifyQueryResult, + TModifyQueryResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ModifyQueryRequest, + FederatedQuery::ModifyQueryResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDeleteQueryResult DeleteQuery( + const FederatedQuery::DeleteQueryRequest& protoRequest, + const TDeleteQuerySettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DeleteQueryRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDeleteQueryResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DeleteQueryResult, + TDeleteQueryResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DeleteQueryRequest, + FederatedQuery::DeleteQueryResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncControlQueryResult ControlQuery( + const FederatedQuery::ControlQueryRequest& protoRequest, + const TControlQuerySettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ControlQueryRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TControlQueryResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ControlQueryResult, + TControlQueryResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ControlQueryRequest, + FederatedQuery::ControlQueryResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncControlQuery, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncGetResultDataResult GetResultData( + const FederatedQuery::GetResultDataRequest& protoRequest, + const TGetResultDataSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::GetResultDataRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TGetResultDataResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::GetResultDataResult, + TGetResultDataResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::GetResultDataRequest, + FederatedQuery::GetResultDataResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncGetResultData, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncListJobsResult ListJobs( + const FederatedQuery::ListJobsRequest& protoRequest, + const TListJobsSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ListJobsRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TListJobsResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ListJobsResult, + TListJobsResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ListJobsRequest, + FederatedQuery::ListJobsResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListJobs, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDescribeJobResult DescribeJob( + const FederatedQuery::DescribeJobRequest& protoRequest, + const TDescribeJobSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DescribeJobRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDescribeJobResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DescribeJobResult, + TDescribeJobResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DescribeJobRequest, + FederatedQuery::DescribeJobResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeJob, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncCreateConnectionResult CreateConnection( + const FederatedQuery::CreateConnectionRequest& protoRequest, + const TCreateConnectionSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::CreateConnectionRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TCreateConnectionResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::CreateConnectionResult, + TCreateConnectionResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::CreateConnectionRequest, + FederatedQuery::CreateConnectionResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateConnection, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncListConnectionsResult ListConnections( + const FederatedQuery::ListConnectionsRequest& protoRequest, + const TListConnectionsSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ListConnectionsRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TListConnectionsResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ListConnectionsResult, + TListConnectionsResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ListConnectionsRequest, + FederatedQuery::ListConnectionsResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListConnections, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDescribeConnectionResult DescribeConnection( + const FederatedQuery::DescribeConnectionRequest& protoRequest, + const TDescribeConnectionSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DescribeConnectionRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDescribeConnectionResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DescribeConnectionResult, + TDescribeConnectionResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DescribeConnectionRequest, + FederatedQuery::DescribeConnectionResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeConnection, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncModifyConnectionResult ModifyConnection( + const FederatedQuery::ModifyConnectionRequest& protoRequest, + const TModifyConnectionSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ModifyConnectionRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TModifyConnectionResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ModifyConnectionResult, + TModifyConnectionResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ModifyConnectionRequest, + FederatedQuery::ModifyConnectionResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyConnection, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDeleteConnectionResult DeleteConnection( + const FederatedQuery::DeleteConnectionRequest& protoRequest, + const TDeleteConnectionSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DeleteConnectionRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDeleteConnectionResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DeleteConnectionResult, + TDeleteConnectionResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DeleteConnectionRequest, + FederatedQuery::DeleteConnectionResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteConnection, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncTestConnectionResult TestConnection( + const FederatedQuery::TestConnectionRequest& protoRequest, + const TTestConnectionSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::TestConnectionRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TTestConnectionResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::TestConnectionResult, + TTestConnectionResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::TestConnectionRequest, + FederatedQuery::TestConnectionResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncTestConnection, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncCreateBindingResult CreateBinding( + const FederatedQuery::CreateBindingRequest& protoRequest, + const TCreateBindingSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::CreateBindingRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TCreateBindingResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::CreateBindingResult, + TCreateBindingResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::CreateBindingRequest, + FederatedQuery::CreateBindingResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateBinding, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncListBindingsResult ListBindings( + const FederatedQuery::ListBindingsRequest& protoRequest, + const TListBindingsSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ListBindingsRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TListBindingsResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ListBindingsResult, + TListBindingsResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ListBindingsRequest, + FederatedQuery::ListBindingsResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListBindings, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDescribeBindingResult DescribeBinding( + const FederatedQuery::DescribeBindingRequest& protoRequest, + const TDescribeBindingSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DescribeBindingRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDescribeBindingResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DescribeBindingResult, + TDescribeBindingResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DescribeBindingRequest, + FederatedQuery::DescribeBindingResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeBinding, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncModifyBindingResult ModifyBinding( + const FederatedQuery::ModifyBindingRequest& protoRequest, + const TModifyBindingSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::ModifyBindingRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TModifyBindingResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::ModifyBindingResult, + TModifyBindingResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::ModifyBindingRequest, + FederatedQuery::ModifyBindingResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyBinding, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } + + TAsyncDeleteBindingResult DeleteBinding( + const FederatedQuery::DeleteBindingRequest& protoRequest, + const TDeleteBindingSettings& settings) { + auto request = MakeOperationRequest<FederatedQuery::DeleteBindingRequest>(settings); + request = protoRequest; + + auto promise = NThreading::NewPromise<TDeleteBindingResult>(); + auto future = promise.GetFuture(); + + auto extractor = MakeResultExtractor< + FederatedQuery::DeleteBindingResult, + TDeleteBindingResult>(std::move(promise)); + + Connections_->RunDeferred< + FederatedQuery::V1::FederatedQueryService, + FederatedQuery::DeleteBindingRequest, + FederatedQuery::DeleteBindingResponse>( + std::move(request), + std::move(extractor), + &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteBinding, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings), + settings.ClientTimeout_); + + return future; + } +}; + +TClient::TClient(const TDriver& driver, const TCommonClientSettings& settings) + : Impl_(new TImpl(CreateInternalInterface(driver), settings)) +{} + +TAsyncCreateQueryResult TClient::CreateQuery( + const FederatedQuery::CreateQueryRequest& request, + const TCreateQuerySettings& settings) { + return Impl_->CreateQuery(request, settings); +} + +TAsyncListQueriesResult TClient::ListQueries( + const FederatedQuery::ListQueriesRequest& request, + const TListQueriesSettings& settings) { + return Impl_->ListQueries(request, settings); +} + +TAsyncDescribeQueryResult TClient::DescribeQuery( + const FederatedQuery::DescribeQueryRequest& request, + const TDescribeQuerySettings& settings) { + return Impl_->DescribeQuery(request, settings); +} + +TAsyncGetQueryStatusResult TClient::GetQueryStatus( + const FederatedQuery::GetQueryStatusRequest& request, + const TGetQueryStatusSettings& settings) { + return Impl_->GetQueryStatus(request, settings); +} + +TAsyncModifyQueryResult TClient::ModifyQuery( + const FederatedQuery::ModifyQueryRequest& request, + const TModifyQuerySettings& settings) { + return Impl_->ModifyQuery(request, settings); +} + +TAsyncDeleteQueryResult TClient::DeleteQuery( + const FederatedQuery::DeleteQueryRequest& request, + const TDeleteQuerySettings& settings) { + return Impl_->DeleteQuery(request, settings); +} + +TAsyncControlQueryResult TClient::ControlQuery( + const FederatedQuery::ControlQueryRequest& request, + const TControlQuerySettings& settings) { + return Impl_->ControlQuery(request, settings); +} + +TAsyncGetResultDataResult TClient::GetResultData( + const FederatedQuery::GetResultDataRequest& request, + const TGetResultDataSettings& settings) { + return Impl_->GetResultData(request, settings); +} + +TAsyncListJobsResult TClient::ListJobs( + const FederatedQuery::ListJobsRequest& request, + const TListJobsSettings& settings) { + return Impl_->ListJobs(request, settings); +} + +TAsyncDescribeJobResult TClient::DescribeJob( + const FederatedQuery::DescribeJobRequest& request, + const TDescribeJobSettings& settings) { + return Impl_->DescribeJob(request, settings); +} + +TAsyncCreateConnectionResult TClient::CreateConnection( + const FederatedQuery::CreateConnectionRequest& request, + const TCreateConnectionSettings& settings) { + return Impl_->CreateConnection(request, settings); +} + +TAsyncListConnectionsResult TClient::ListConnections( + const FederatedQuery::ListConnectionsRequest& request, + const TListConnectionsSettings& settings) { + return Impl_->ListConnections(request, settings); +} + +TAsyncDescribeConnectionResult TClient::DescribeConnection( + const FederatedQuery::DescribeConnectionRequest& request, + const TDescribeConnectionSettings& settings) { + return Impl_->DescribeConnection(request, settings); +} + +TAsyncModifyConnectionResult TClient::ModifyConnection( + const FederatedQuery::ModifyConnectionRequest& request, + const TModifyConnectionSettings& settings) { + return Impl_->ModifyConnection(request, settings); +} + +TAsyncDeleteConnectionResult TClient::DeleteConnection( + const FederatedQuery::DeleteConnectionRequest& request, + const TDeleteConnectionSettings& settings) { + return Impl_->DeleteConnection(request, settings); +} + +TAsyncTestConnectionResult TClient::TestConnection( + const FederatedQuery::TestConnectionRequest& request, + const TTestConnectionSettings& settings) { + return Impl_->TestConnection(request, settings); +} + +TAsyncCreateBindingResult TClient::CreateBinding( + const FederatedQuery::CreateBindingRequest& request, + const TCreateBindingSettings& settings) { + return Impl_->CreateBinding(request, settings); +} + +TAsyncListBindingsResult TClient::ListBindings( + const FederatedQuery::ListBindingsRequest& request, + const TListBindingsSettings& settings) { + return Impl_->ListBindings(request, settings); +} + +TAsyncDescribeBindingResult TClient::DescribeBinding( + const FederatedQuery::DescribeBindingRequest& request, + const TDescribeBindingSettings& settings) { + return Impl_->DescribeBinding(request, settings); +} + +TAsyncModifyBindingResult TClient::ModifyBinding( + const FederatedQuery::ModifyBindingRequest& request, + const TModifyBindingSettings& settings) { + return Impl_->ModifyBinding(request, settings); +} + +TAsyncDeleteBindingResult TClient::DeleteBinding( + const FederatedQuery::DeleteBindingRequest& request, + const TDeleteBindingSettings& settings) { + return Impl_->DeleteBinding(request, settings); +} + +} diff --git a/ydb/public/lib/fq/fq.h b/ydb/public/lib/fq/fq.h new file mode 100644 index 00000000000..e2d3405fbb6 --- /dev/null +++ b/ydb/public/lib/fq/fq.h @@ -0,0 +1,215 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <ydb/public/api/grpc/fq_v1.grpc.pb.h> + +namespace NYdb::NFq { + +template<class TProtoResult> +class TProtoResultWrapper : public NYdb::TStatus { + friend class TClient; + +private: + TProtoResultWrapper( + NYdb::TStatus&& status, + std::unique_ptr<TProtoResult> result) + : TStatus(std::move(status)) + , Result(std::move(result)) + { } + +public: + const TProtoResult& GetResult() const { + if (!Result) { + ythrow yexception() << "Uninitialized result: " << GetIssues().ToString(); + } + return *Result; + } + + bool HasResult() const { + return Result; + } + +private: + std::unique_ptr<TProtoResult> Result; +}; + +using TCreateQueryResult = TProtoResultWrapper<FederatedQuery::CreateQueryResult>; +using TAsyncCreateQueryResult = NThreading::TFuture<TCreateQueryResult>; +struct TCreateQuerySettings : public NYdb::TOperationRequestSettings<TCreateQuerySettings> {}; + +using TListQueriesResult = TProtoResultWrapper<FederatedQuery::ListQueriesResult>; +using TAsyncListQueriesResult = NThreading::TFuture<TListQueriesResult>; +struct TListQueriesSettings : public NYdb::TOperationRequestSettings<TListQueriesSettings> {}; + +using TDescribeQueryResult = TProtoResultWrapper<FederatedQuery::DescribeQueryResult>; +using TAsyncDescribeQueryResult = NThreading::TFuture<TDescribeQueryResult>; +struct TDescribeQuerySettings : public NYdb::TOperationRequestSettings<TDescribeQuerySettings> {}; + +using TGetQueryStatusResult = TProtoResultWrapper<FederatedQuery::GetQueryStatusResult>; +using TAsyncGetQueryStatusResult = NThreading::TFuture<TGetQueryStatusResult>; +struct TGetQueryStatusSettings : public NYdb::TOperationRequestSettings<TGetQueryStatusSettings> {}; + +using TModifyQueryResult = TProtoResultWrapper<FederatedQuery::ModifyQueryResult>; +using TAsyncModifyQueryResult = NThreading::TFuture<TModifyQueryResult>; +struct TModifyQuerySettings : public NYdb::TOperationRequestSettings<TModifyQuerySettings> {}; + +using TDeleteQueryResult = TProtoResultWrapper<FederatedQuery::DeleteQueryResult>; +using TAsyncDeleteQueryResult = NThreading::TFuture<TDeleteQueryResult>; +struct TDeleteQuerySettings : public NYdb::TOperationRequestSettings<TDeleteQuerySettings> {}; + +using TControlQueryResult = TProtoResultWrapper<FederatedQuery::ControlQueryResult>; +using TAsyncControlQueryResult = NThreading::TFuture<TControlQueryResult>; +struct TControlQuerySettings : public NYdb::TOperationRequestSettings<TControlQuerySettings> {}; + +using TGetResultDataResult = TProtoResultWrapper<FederatedQuery::GetResultDataResult>; +using TAsyncGetResultDataResult = NThreading::TFuture<TGetResultDataResult>; +struct TGetResultDataSettings : public NYdb::TOperationRequestSettings<TGetResultDataSettings> {}; + +using TListJobsResult = TProtoResultWrapper<FederatedQuery::ListJobsResult>; +using TAsyncListJobsResult = NThreading::TFuture<TListJobsResult>; +struct TListJobsSettings : public NYdb::TOperationRequestSettings<TListJobsSettings> {}; + +using TDescribeJobResult = TProtoResultWrapper<FederatedQuery::DescribeJobResult>; +using TAsyncDescribeJobResult = NThreading::TFuture<TDescribeJobResult>; +struct TDescribeJobSettings : public NYdb::TOperationRequestSettings<TDescribeJobSettings> {}; + +using TCreateConnectionResult = TProtoResultWrapper<FederatedQuery::CreateConnectionResult>; +using TAsyncCreateConnectionResult = NThreading::TFuture<TCreateConnectionResult>; +struct TCreateConnectionSettings : public NYdb::TOperationRequestSettings<TCreateConnectionSettings> {}; + +using TListConnectionsResult = TProtoResultWrapper<FederatedQuery::ListConnectionsResult>; +using TAsyncListConnectionsResult = NThreading::TFuture<TListConnectionsResult>; +struct TListConnectionsSettings : public NYdb::TOperationRequestSettings<TListConnectionsSettings> {}; + +using TDescribeConnectionResult = TProtoResultWrapper<FederatedQuery::DescribeConnectionResult>; +using TAsyncDescribeConnectionResult = NThreading::TFuture<TDescribeConnectionResult>; +struct TDescribeConnectionSettings : public NYdb::TOperationRequestSettings<TDescribeConnectionSettings> {}; + +using TModifyConnectionResult = TProtoResultWrapper<FederatedQuery::ModifyConnectionResult>; +using TAsyncModifyConnectionResult = NThreading::TFuture<TModifyConnectionResult>; +struct TModifyConnectionSettings : public NYdb::TOperationRequestSettings<TModifyConnectionSettings> {}; + +using TDeleteConnectionResult = TProtoResultWrapper<FederatedQuery::DeleteConnectionResult>; +using TAsyncDeleteConnectionResult = NThreading::TFuture<TDeleteConnectionResult>; +struct TDeleteConnectionSettings : public NYdb::TOperationRequestSettings<TDeleteConnectionSettings> {}; + +using TTestConnectionResult = TProtoResultWrapper<FederatedQuery::TestConnectionResult>; +using TAsyncTestConnectionResult = NThreading::TFuture<TTestConnectionResult>; +struct TTestConnectionSettings : public NYdb::TOperationRequestSettings<TTestConnectionSettings> {}; + +using TCreateBindingResult = TProtoResultWrapper<FederatedQuery::CreateBindingResult>; +using TAsyncCreateBindingResult = NThreading::TFuture<TCreateBindingResult>; +struct TCreateBindingSettings : public NYdb::TOperationRequestSettings<TCreateBindingSettings> {}; + +using TListBindingsResult = TProtoResultWrapper<FederatedQuery::ListBindingsResult>; +using TAsyncListBindingsResult = NThreading::TFuture<TListBindingsResult>; +struct TListBindingsSettings : public NYdb::TOperationRequestSettings<TListBindingsSettings> {}; + +using TDescribeBindingResult = TProtoResultWrapper<FederatedQuery::DescribeBindingResult>; +using TAsyncDescribeBindingResult = NThreading::TFuture<TDescribeBindingResult>; +struct TDescribeBindingSettings : public NYdb::TOperationRequestSettings<TDescribeBindingSettings> {}; + +using TModifyBindingResult = TProtoResultWrapper<FederatedQuery::ModifyBindingResult>; +using TAsyncModifyBindingResult = NThreading::TFuture<TModifyBindingResult>; +struct TModifyBindingSettings : public NYdb::TOperationRequestSettings<TModifyBindingSettings> {}; + +using TDeleteBindingResult = TProtoResultWrapper<FederatedQuery::DeleteBindingResult>; +using TAsyncDeleteBindingResult = NThreading::TFuture<TDeleteBindingResult>; +struct TDeleteBindingSettings : public NYdb::TOperationRequestSettings<TDeleteBindingSettings> {}; + +class TClient { + class TImpl; + +public: + TClient(const NYdb::TDriver& driver, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings()); + + TAsyncCreateQueryResult CreateQuery( + const FederatedQuery::CreateQueryRequest& request, + const TCreateQuerySettings& settings = TCreateQuerySettings()); + + TAsyncListQueriesResult ListQueries( + const FederatedQuery::ListQueriesRequest& request, + const TListQueriesSettings& settings = TListQueriesSettings()); + + TAsyncDescribeQueryResult DescribeQuery( + const FederatedQuery::DescribeQueryRequest& request, + const TDescribeQuerySettings& settings = TDescribeQuerySettings()); + + TAsyncGetQueryStatusResult GetQueryStatus( + const FederatedQuery::GetQueryStatusRequest& request, + const TGetQueryStatusSettings& settings = TGetQueryStatusSettings()); + + TAsyncModifyQueryResult ModifyQuery( + const FederatedQuery::ModifyQueryRequest& request, + const TModifyQuerySettings& settings = TModifyQuerySettings()); + + TAsyncDeleteQueryResult DeleteQuery( + const FederatedQuery::DeleteQueryRequest& request, + const TDeleteQuerySettings& settings = TDeleteQuerySettings()); + + TAsyncControlQueryResult ControlQuery( + const FederatedQuery::ControlQueryRequest& request, + const TControlQuerySettings& settings = TControlQuerySettings()); + + TAsyncGetResultDataResult GetResultData( + const FederatedQuery::GetResultDataRequest& request, + const TGetResultDataSettings& settings = TGetResultDataSettings()); + + TAsyncListJobsResult ListJobs( + const FederatedQuery::ListJobsRequest& request, + const TListJobsSettings& settings = TListJobsSettings()); + + TAsyncDescribeJobResult DescribeJob( + const FederatedQuery::DescribeJobRequest& request, + const TDescribeJobSettings& settings = TDescribeJobSettings()); + + TAsyncCreateConnectionResult CreateConnection( + const FederatedQuery::CreateConnectionRequest& request, + const TCreateConnectionSettings& settings = TCreateConnectionSettings()); + + TAsyncListConnectionsResult ListConnections( + const FederatedQuery::ListConnectionsRequest& request, + const TListConnectionsSettings& settings = TListConnectionsSettings()); + + TAsyncDescribeConnectionResult DescribeConnection( + const FederatedQuery::DescribeConnectionRequest& request, + const TDescribeConnectionSettings& settings = TDescribeConnectionSettings()); + + TAsyncModifyConnectionResult ModifyConnection( + const FederatedQuery::ModifyConnectionRequest& request, + const TModifyConnectionSettings& settings = TModifyConnectionSettings()); + + TAsyncDeleteConnectionResult DeleteConnection( + const FederatedQuery::DeleteConnectionRequest& request, + const TDeleteConnectionSettings& settings = TDeleteConnectionSettings()); + + TAsyncTestConnectionResult TestConnection( + const FederatedQuery::TestConnectionRequest& request, + const TTestConnectionSettings& settings = TTestConnectionSettings()); + + TAsyncCreateBindingResult CreateBinding( + const FederatedQuery::CreateBindingRequest& request, + const TCreateBindingSettings& settings = TCreateBindingSettings()); + + TAsyncListBindingsResult ListBindings( + const FederatedQuery::ListBindingsRequest& request, + const TListBindingsSettings& settings = TListBindingsSettings()); + + TAsyncDescribeBindingResult DescribeBinding( + const FederatedQuery::DescribeBindingRequest& request, + const TDescribeBindingSettings& settings = TDescribeBindingSettings()); + + TAsyncModifyBindingResult ModifyBinding( + const FederatedQuery::ModifyBindingRequest& request, + const TModifyBindingSettings& settings = TModifyBindingSettings()); + + TAsyncDeleteBindingResult DeleteBinding( + const FederatedQuery::DeleteBindingRequest& request, + const TDeleteBindingSettings& settings = TDeleteBindingSettings()); + +private: + std::shared_ptr<TImpl> Impl_; +}; + +} // namespace NYdb::NFq diff --git a/ydb/public/lib/fq/helpers.h b/ydb/public/lib/fq/helpers.h new file mode 100644 index 00000000000..7d84a2c1b33 --- /dev/null +++ b/ydb/public/lib/fq/helpers.h @@ -0,0 +1,25 @@ +#pragma once + +#include "scope.h" + +namespace NYdb { +namespace NFq { + +template<typename T> +T CreateFqSettings(const TScope& scope) +{ + T settings; + settings.Header_ = { + { "x-ydb-fq-project", scope.ToString() } + }; + return settings; +} + +template<typename T> +T CreateFqSettings(const TString& folderId) +{ + return CreateFqSettings<T>(TScope{TScope::YandexCloudScopeSchema + "://" + folderId}); +} + +} // namespace NFq +} // namespace Ndb diff --git a/ydb/public/lib/fq/scope.cpp b/ydb/public/lib/fq/scope.cpp new file mode 100644 index 00000000000..fac8789e5f5 --- /dev/null +++ b/ydb/public/lib/fq/scope.cpp @@ -0,0 +1,9 @@ +#include "scope.h" + +namespace NYdb { +namespace NFq { + +TString TScope::YandexCloudScopeSchema = "yandexcloud"; + +} // namespace NFq +} // namespace Ndb diff --git a/ydb/public/lib/fq/scope.h b/ydb/public/lib/fq/scope.h new file mode 100644 index 00000000000..4d54ea0a8e4 --- /dev/null +++ b/ydb/public/lib/fq/scope.h @@ -0,0 +1,36 @@ +#pragma once + +#include <util/string/builder.h> + +namespace NYdb { +namespace NFq { + +class TScope { +public: + static TString YandexCloudScopeSchema; + + TScope() + { } + + TScope(const TString& scope) + : Scope(scope) + { } + + TScope(TString&& scope) + : Scope(std::move(scope)) + { } + + const TString& ToString() const { + return Scope; + } + + bool Empty() const { + return Scope.empty(); + } + +private: + TString Scope; +}; + +} // namespace NFq +} // namespace Ndb diff --git a/ydb/services/fq/CMakeLists.txt b/ydb/services/fq/CMakeLists.txt new file mode 100644 index 00000000000..75abc50a008 --- /dev/null +++ b/ydb/services/fq/CMakeLists.txt @@ -0,0 +1,24 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(ydb-services-fq) +target_link_libraries(ydb-services-fq PUBLIC + contrib-libs-cxxsupp + yutil + cpp-grpc-server + library-cpp-retry + ydb-core-grpc_services + core-grpc_services-base + ydb-library-protobuf_printer + api-grpc +) +target_sources(ydb-services-fq PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/fq/grpc_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/fq/private_grpc.cpp +) diff --git a/ydb/services/fq/grpc_service.cpp b/ydb/services/fq/grpc_service.cpp new file mode 100644 index 00000000000..867714214dd --- /dev/null +++ b/ydb/services/fq/grpc_service.cpp @@ -0,0 +1,278 @@ +#include "grpc_service.h" + +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/grpc_request_proxy.h> +#include <ydb/core/grpc_services/rpc_calls.h> +#include <ydb/core/grpc_services/service_fq.h> +#include <ydb/library/protobuf_printer/security_printer.h> + +namespace NKikimr::NGRpcService { + +TGRpcFederatedQueryService::TGRpcFederatedQueryService(NActors::TActorSystem *system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId id) + : ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(id) {} + +void TGRpcFederatedQueryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) { + CQ_ = cq; + SetupIncomingRequests(std::move(logger)); +} + +void TGRpcFederatedQueryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter_ = limiter; +} + +bool TGRpcFederatedQueryService::IncRequest() { + return Limiter_->Inc(); +} + +void TGRpcFederatedQueryService::DecRequest() { + Limiter_->Dec(); + Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); +} + +void TGRpcFederatedQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + + using NPerms = NKikimr::TEvTicketParser::TEvAuthorizeTicket; + + static const std::function CreateQueryPermissions{[](const FederatedQuery::CreateQueryRequest& request) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.queries.create"), + NPerms::Optional("yq.connections.use"), + NPerms::Optional("yq.bindings.use") + }; + if (request.execute_mode() != FederatedQuery::SAVE) { + permissions.push_back(NPerms::Required("yq.queries.invoke")); + } + if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) { + permissions.push_back(NPerms::Required("yq.resources.managePublic")); + } + return permissions; + }}; + + static const std::function ListQueriesPermissions{[](const FederatedQuery::ListQueriesRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function DescribeQueryPermissions{[](const FederatedQuery::DescribeQueryRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.get"), + NPerms::Optional("yq.queries.viewAst"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function GetQueryStatusPermissions{[](const FederatedQuery::GetQueryStatusRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.getStatus"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function ModifyQueryPermissions{[](const FederatedQuery::ModifyQueryRequest& request) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.queries.update"), + NPerms::Optional("yq.connections.use"), + NPerms::Optional("yq.bindings.use"), + NPerms::Optional("yq.resources.managePrivate") + }; + if (request.execute_mode() != FederatedQuery::SAVE) { + permissions.push_back(NPerms::Required("yq.queries.invoke")); + } + if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) { + permissions.push_back(NPerms::Required("yq.resources.managePublic")); + } + return permissions; + }}; + + static const std::function DeleteQueryPermissions{[](const FederatedQuery::DeleteQueryRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.delete"), + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.managePrivate") + }; + }}; + + static const std::function ControlQueryPermissions{[](const FederatedQuery::ControlQueryRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.control"), + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.managePrivate") + }; + }}; + + static const std::function GetResultDataPermissions{[](const FederatedQuery::GetResultDataRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.queries.getData"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function ListJobsPermissions{[](const FederatedQuery::ListJobsRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.jobs.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function DescribeJobPermissions{[](const FederatedQuery::DescribeJobRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.jobs.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function CreateConnectionPermissions{[](const FederatedQuery::CreateConnectionRequest& request) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.connections.create"), + }; + if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) { + permissions.push_back(NPerms::Required("yq.resources.managePublic")); + } + return permissions; + }}; + + static const std::function ListConnectionsPermissions{[](const FederatedQuery::ListConnectionsRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.connections.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function DescribeConnectionPermissions{[](const FederatedQuery::DescribeConnectionRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.connections.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function ModifyConnectionPermissions{[](const FederatedQuery::ModifyConnectionRequest& request) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.connections.update"), + NPerms::Optional("yq.resources.managePrivate") + }; + if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) { + permissions.push_back(NPerms::Required("yq.resources.managePublic")); + } + return permissions; + }}; + + static const std::function DeleteConnectionPermissions{[](const FederatedQuery::DeleteConnectionRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.connections.delete"), + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.managePrivate") + }; + }}; + + static const std::function TestConnectionPermissions{[](const FederatedQuery::TestConnectionRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.connections.create") + }; + }}; + + static const std::function CreateBindingPermissions{[](const FederatedQuery::CreateBindingRequest&) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.bindings.create"), + }; + // For use in binding links on connection with visibility SCOPE, + // the yq.resources.managePublic permission is required. But there + // is no information about connection visibility in this place, + // so yq.resources.managePublic is always requested as optional + permissions.push_back(NPerms::Optional("yq.resources.managePublic")); + return permissions; + }}; + + static const std::function ListBindingsPermissions{[](const FederatedQuery::ListBindingsRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.bindings.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function DescribeBindingPermissions{[](const FederatedQuery::DescribeBindingRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.bindings.get"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") + }; + }}; + + static const std::function ModifyBindingPermissions{[](const FederatedQuery::ModifyBindingRequest&) { + TVector<NPerms::TPermission> permissions{ + NPerms::Required("yq.bindings.update"), + NPerms::Optional("yq.resources.managePrivate") + }; + // For use in binding links on connection with visibility SCOPE, + // the yq.resources.managePublic permission is required. But there + // is no information about connection visibility in this place, + // so yq.resources.managePublic is always requested as optional + permissions.push_back(NPerms::Optional("yq.resources.managePublic")); + return permissions; + }}; + + static const std::function DeleteBindingPermissions{[](const FederatedQuery::DeleteBindingRequest&) -> TVector<NPerms::TPermission> { + return { + NPerms::Required("yq.bindings.delete"), + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.managePrivate") + }; + }}; + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, CB, PERMISSIONS) \ +MakeIntrusive<TGRpcRequest<FederatedQuery::NAME##Request, FederatedQuery::NAME##Response, TGRpcFederatedQueryService, TSecurityTextFormatPrinter<FederatedQuery::NAME##Request>, TSecurityTextFormatPrinter<FederatedQuery::NAME##Response>>>( \ + this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcFqRequestOperationCall<FederatedQuery::NAME##Request, FederatedQuery::NAME##Response> \ + (ctx, &CB, PERMISSIONS)); \ + }, \ + &FederatedQuery::V1::FederatedQueryService::AsyncService::Request##NAME, \ + #NAME, logger, getCounterBlock("fq", #NAME)) \ + ->Run(); \ + + ADD_REQUEST(CreateQuery, DoFederatedQueryCreateQueryRequest, CreateQueryPermissions) + ADD_REQUEST(ListQueries, DoFederatedQueryListQueriesRequest, ListQueriesPermissions) + ADD_REQUEST(DescribeQuery, DoFederatedQueryDescribeQueryRequest, DescribeQueryPermissions) + ADD_REQUEST(GetQueryStatus, DoFederatedQueryGetQueryStatusRequest, GetQueryStatusPermissions) + ADD_REQUEST(ModifyQuery, DoFederatedQueryModifyQueryRequest, ModifyQueryPermissions) + ADD_REQUEST(DeleteQuery, DoFederatedQueryDeleteQueryRequest, DeleteQueryPermissions) + ADD_REQUEST(ControlQuery, DoFederatedQueryControlQueryRequest, ControlQueryPermissions) + ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions) + ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions) + ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions) + ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions) + ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions) + ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions) + ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions) + ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions) + ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions) + ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions) + ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions) + ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions) + ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions) + ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions) + +#undef ADD_REQUEST + +} + +} // namespace NKikimr::NGRpcService diff --git a/ydb/services/fq/grpc_service.h b/ydb/services/fq/grpc_service.h new file mode 100644 index 00000000000..4bc79d0e8a7 --- /dev/null +++ b/ydb/services/fq/grpc_service.h @@ -0,0 +1,36 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> + +#include <library/cpp/grpc/server/grpc_server.h> + +#include <ydb/public/api/grpc/fq_v1.grpc.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +class TGRpcFederatedQueryService + : public NGrpc::TGrpcServiceBase<FederatedQuery::V1::FederatedQueryService> +{ +public: + TGRpcFederatedQueryService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + NActors::TActorId id); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + + bool IncRequest(); + void DecRequest(); +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + + NActors::TActorSystem* ActorSystem_; + grpc::ServerCompletionQueue* CQ_ = nullptr; + + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; + NActors::TActorId GRpcRequestProxyId_; + NGrpc::TGlobalLimiter* Limiter_ = nullptr; +}; + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/fq/private_grpc.cpp b/ydb/services/fq/private_grpc.cpp new file mode 100644 index 00000000000..d7ce6a320c1 --- /dev/null +++ b/ydb/services/fq/private_grpc.cpp @@ -0,0 +1,66 @@ +#include "private_grpc.h" + +#include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/grpc_helper.h> +#include <ydb/core/grpc_services/service_fq_internal.h> +#include <ydb/library/protobuf_printer/security_printer.h> + +namespace NKikimr { +namespace NGRpcService { + +TGRpcFqPrivateTaskService::TGRpcFqPrivateTaskService(NActors::TActorSystem *system, + TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId id) + : ActorSystem_(system) + , Counters_(counters) + , GRpcRequestProxyId_(id) {} + +void TGRpcFqPrivateTaskService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { + CQ_ = cq; + SetupIncomingRequests(std::move(logger)); +} + +void TGRpcFqPrivateTaskService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { + Limiter_ = limiter; +} + +bool TGRpcFqPrivateTaskService::IncRequest() { + return Limiter_->Inc(); +} + +void TGRpcFqPrivateTaskService::DecRequest() { + Limiter_->Dec(); + Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); +} + +void TGRpcFqPrivateTaskService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif +#define ADD_REQUEST(NAME, CB) \ +MakeIntrusive<TGRpcRequest<Fq::Private::NAME##Request, Fq::Private::NAME##Response, TGRpcFqPrivateTaskService, TSecurityTextFormatPrinter<Fq::Private::NAME##Request>, TSecurityTextFormatPrinter<Fq::Private::NAME##Response>>>( \ + this, &Service_, CQ_, \ + [this](NGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall<Fq::Private::NAME##Request, Fq::Private::NAME##Response> \ + (ctx, &CB)); \ + }, \ + &Fq::Private::V1::FqPrivateTaskService::AsyncService::Request##NAME, \ + #NAME, logger, getCounterBlock("fq_internal", #NAME)) \ + ->Run(); \ + + ADD_REQUEST(PingTask, DoFqPrivatePingTaskRequest) + + ADD_REQUEST(GetTask, DoFqPrivateGetTaskRequest) + + ADD_REQUEST(WriteTaskResult, DoFqPrivateWriteTaskResultRequest) + + ADD_REQUEST(NodesHealthCheck, DoFqPrivateNodesHealthCheckRequest) + +#undef ADD_REQUEST +} + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/fq/private_grpc.h b/ydb/services/fq/private_grpc.h new file mode 100644 index 00000000000..ddd5252a0e2 --- /dev/null +++ b/ydb/services/fq/private_grpc.h @@ -0,0 +1,34 @@ +#pragma once + +#include <library/cpp/actors/core/actorsystem.h> +#include <library/cpp/grpc/server/grpc_server.h> +#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h> + +namespace NKikimr { +namespace NGRpcService { + +class TGRpcFqPrivateTaskService + : public NGrpc::TGrpcServiceBase<Fq::Private::V1::FqPrivateTaskService> +{ +public: + TGRpcFqPrivateTaskService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters, + NActors::TActorId id); + + void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; + void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; + + bool IncRequest(); + void DecRequest(); +private: + void SetupIncomingRequests(NGrpc::TLoggerPtr logger); + + NActors::TActorSystem* ActorSystem_; + grpc::ServerCompletionQueue* CQ_ = nullptr; + + TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_; + NActors::TActorId GRpcRequestProxyId_; + NGrpc::TGlobalLimiter* Limiter_ = nullptr; +}; + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/fq/ut_integration/CMakeLists.darwin.txt b/ydb/services/fq/ut_integration/CMakeLists.darwin.txt new file mode 100644 index 00000000000..d8f6f2a9913 --- /dev/null +++ b/ydb/services/fq/ut_integration/CMakeLists.darwin.txt @@ -0,0 +1,61 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-fq-ut_integration) +target_compile_options(ydb-services-fq-ut_integration PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-fq-ut_integration PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/fq +) +target_link_libraries(ydb-services-fq-ut_integration PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-fq + library-cpp-getopt + cpp-grpc-client + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + yq-libs-control_plane_storage + yq-libs-db_schema + yq-libs-private_client + providers-common-db_id_async_resolver + yql-sql-pg_dummy + common-clickhouse-client + library-yql-utils + public-lib-fq + ydb-services-ydb +) +target_link_options(ydb-services-fq-ut_integration PRIVATE + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation +) +target_sources(ydb-services-fq-ut_integration PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/ut_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/fq_ut.cpp +) +add_test( + NAME + ydb-services-fq-ut_integration + COMMAND + ydb-services-fq-ut_integration + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-services-fq-ut_integration) diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux.txt b/ydb/services/fq/ut_integration/CMakeLists.linux.txt new file mode 100644 index 00000000000..a08124095b2 --- /dev/null +++ b/ydb/services/fq/ut_integration/CMakeLists.linux.txt @@ -0,0 +1,65 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(ydb-services-fq-ut_integration) +target_compile_options(ydb-services-fq-ut_integration PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(ydb-services-fq-ut_integration PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/fq +) +target_link_libraries(ydb-services-fq-ut_integration PUBLIC + contrib-libs-cxxsupp + yutil + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache + library-cpp-cpuid_check + cpp-testing-unittest_main + ydb-services-fq + library-cpp-getopt + cpp-grpc-client + cpp-regex-pcre + library-cpp-svnversion + ydb-core-testlib + yq-libs-control_plane_storage + yq-libs-db_schema + yq-libs-private_client + providers-common-db_id_async_resolver + yql-sql-pg_dummy + clickhouse_client_udf + library-yql-utils + public-lib-fq + ydb-services-ydb +) +target_link_options(ydb-services-fq-ut_integration PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(ydb-services-fq-ut_integration PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/ut_utils.cpp + ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/fq_ut.cpp +) +add_test( + NAME + ydb-services-fq-ut_integration + COMMAND + ydb-services-fq-ut_integration + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +vcs_info(ydb-services-fq-ut_integration) diff --git a/ydb/services/fq/ut_integration/CMakeLists.txt b/ydb/services/fq/ut_integration/CMakeLists.txt new file mode 100644 index 00000000000..fc7b1ee73ce --- /dev/null +++ b/ydb/services/fq/ut_integration/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX AND NOT APPLE) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp new file mode 100644 index 00000000000..b7024005d90 --- /dev/null +++ b/ydb/services/fq/ut_integration/fq_ut.cpp @@ -0,0 +1,998 @@ +#include <ydb/services/ydb/ydb_common_ut.h> +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> + +#include <ydb/public/lib/fq/fq.h> +#include <ydb/public/lib/fq/helpers.h> +#include <ydb/core/yq/libs/db_schema/db_schema.h> +#include <ydb/core/yq/libs/mock/yql_mock.h> +#include <ydb/core/yq/libs/private_client/private_client_fq.h> + +#include <ydb/core/yq/libs/control_plane_storage/message_builders.h> +#include <ydb/core/yq/libs/actors/database_resolver.h> + +#include <ydb/library/yql/public/issue/yql_issue_message.h> + +#include <library/cpp/protobuf/util/pb_io.h> +#include <library/cpp/retry/retry.h> +#include <util/system/mutex.h> +#include "ut_utils.h" +#include <google/protobuf/util/time_util.h> + +#include <ydb/public/lib/fq/scope.h> +#include <util/system/hostname.h> + +#include <util/string/split.h> + +using namespace NYdb; +using namespace FederatedQuery; +using namespace NYdb::NFq; + +namespace { + const ui32 Retries = 10; + + void PrintProtoIssues(const NProtoBuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& protoIssues) { + if (protoIssues.empty()) { + Cerr << "No Issues" << Endl; + return; + } + NYql::TIssues issues; + NYql::IssuesFromMessage(protoIssues, issues); + Cerr << ">>> Issues: " << issues.ToString() << Endl; + } + + TString CreateNewHistoryAndWaitFinish(const TString& folderId, + NYdb::NFq::TClient& client, const TString& yqlText, + const FederatedQuery::QueryMeta::ComputeStatus& expectedStatusResult) + { + //CreateQuery + TString queryId; + { + auto request = ::NFq::TCreateQueryBuilder{} + .SetText(yqlText) + .Build(); + auto result = client.CreateQuery( + request, CreateFqSettings<TCreateQuerySettings>(folderId)) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + queryId = result.GetResult().query_id(); + } + // GetQueryStatus + const auto request = ::NFq::TGetQueryStatusBuilder{} + .SetQueryId(queryId) + .Build(); + const auto result = DoWithRetryOnRetCode([&]() { + auto result = client.GetQueryStatus( + request, CreateFqSettings<TGetQueryStatusSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + return result.GetResult().status() == expectedStatusResult; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + + return queryId; + } + + void CheckGetResultData( + NYdb::NFq::TClient& client, + const TString& queryId, + const TString& folderId, + const ui64 rowsCount, + const int colsSize, + const int answer) + { + const auto request = ::NFq::TGetResultDataBuilder{} + .SetQueryId(queryId) + .Build(); + const auto result = client.GetResultData( + request, CreateFqSettings<TGetResultDataSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& resultSet = result.GetResult().result_set(); + UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), rowsCount); + UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), colsSize); + if (!resultSet.rows().empty()) { + const auto& item = resultSet.rows(0).items(0); + TString str = item.DebugString(); + TVector<TString> arr; + StringSplitter(str).Split(' ').SkipEmpty().AddTo(&arr); + Y_VERIFY(arr.size() == 2, "Incorrect numeric result"); + UNIT_ASSERT_VALUES_EQUAL(std::stoi(arr.back()), answer); + } + } +} + +Y_UNIT_TEST_SUITE(Yq_1) { + Y_UNIT_TEST(Basic) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const auto folderId = "some_folder_id"; + const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + + { + const auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId("foo") + .Build(); + const auto result = DoWithRetryOnRetCode([&]() { + auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>("WTF")) + .ExtractValueSync(); + return result.GetStatus() == EStatus::BAD_REQUEST; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + + { + auto request = ::NFq::TListQueriesBuilder{}.Build(); + auto result = DoWithRetryOnRetCode([&]() { + auto result = client.ListQueries( + request, CreateFqSettings<TListQueriesSettings>("WTF")) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().query().size(), 0); + return result.GetStatus() == EStatus::SUCCESS; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + + { + const auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId(queryId) + .Build(); + auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto status = result.GetResult().query().meta().status(); + UNIT_ASSERT(status == FederatedQuery::QueryMeta::COMPLETED); + } + + { + const auto request = ::NFq::TModifyQueryBuilder() + .SetQueryId(queryId) + .SetName("MODIFIED_NAME") + .Build(); + const auto result = client.ModifyQuery( + request, CreateFqSettings<TModifyQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId(queryId) + .Build(); + const auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto& res = result.GetResult(); + UNIT_ASSERT_VALUES_EQUAL(res.query().content().name(), "MODIFIED_NAME"); + UNIT_ASSERT(res.query().content().acl().visibility() == static_cast<int>(Acl_Visibility_SCOPE)); + } + + { + const auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId("") + .Build(); + const auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>("")) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + + { + const auto request = ::NFq::TGetResultDataBuilder() + .SetQueryId("") + .Build(); + const auto result = client.GetResultData( + request, CreateFqSettings<TGetResultDataSettings>("")) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(Basic_EmptyTable) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + UpsertToExistingTable(driver, location); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("testdbempty") + .CreateYdb("Root", location, "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + const TString queryId = CreateNewHistoryAndWaitFinish( + folderId, client, + "select count(*) from testdbempty.`yq/empty_table`", + FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 0); + } + + Y_UNIT_TEST(Basic_EmptyList) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; + CreateNewHistoryAndWaitFinish(folderId, client, "select []", expectedStatus); + } + + Y_UNIT_TEST(Basic_EmptyDict) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; + CreateNewHistoryAndWaitFinish(folderId, client, "select {}", expectedStatus); + } + + Y_UNIT_TEST(Basic_Null) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; + CreateNewHistoryAndWaitFinish(folderId, client, "select null", expectedStatus); + } + + SIMPLE_UNIT_FORKED_TEST(Basic_Tagged) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + + + { + auto request = ::NFq::TCreateConnectionBuilder{} + .SetName("testdb00") + .CreateYdb("Root", location, "") + .Build(); + + auto result = client.CreateConnection( + request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; + CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(count(*), \"tag\") from testdb00.`yq/connections`", expectedStatus); + } + + Y_UNIT_TEST(Basic_TaggedLiteral) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + + auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED; + CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(1, \"tag\")", expectedStatus); + } + + // use fork for data test due to ch initialization problem + SIMPLE_UNIT_FORKED_TEST(ExtendedDatabaseId) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + + NYdb::NFq::TClient client(driver); + const TString folderId = "folder_id_" + CreateGuidAsString(); + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("testdb01") + .CreateYdb("FakeDatabaseId", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("testdb02") + .CreateYdb("FakeDatabaseId", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, + "select count(*) from testdb01.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 2); + } + + { + // test connections db with 2 databaseId + const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, + "select count(*) from testdb02.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 2); + } + } + + Y_UNIT_TEST(DescribeConnection) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const TString folderId = "some_folder_id"; + TString conId; + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("created_conn") + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + conId = result.GetResult().connection_id(); + } + { + auto request = ::NFq::TDescribeConnectionBuilder() + .SetConnectionId(conId) + .Build(); + auto result = client + .DescribeConnection(request, CreateFqSettings<TDescribeConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + const auto& res = result.GetResult().connection(); + UNIT_ASSERT_VALUES_EQUAL(res.meta().id(), conId); + UNIT_ASSERT_VALUES_EQUAL(res.meta().created_by(), "root@builtin"); + UNIT_ASSERT_VALUES_EQUAL(res.meta().modified_by(), "root@builtin"); + UNIT_ASSERT_VALUES_EQUAL(res.content().name(), "created_conn"); + } + } + + Y_UNIT_TEST(ListConnections) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const size_t conns = 3; + const auto folderId = TString(__func__) + "folder_id"; + {//CreateConnections + for (size_t i = 0; i < conns - 1; ++i) { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("testdb" + ToString(i)) + .CreateYdb("FakeDatabaseId", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // yds + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("testdb2") + .CreateDataStreams("FakeDatabaseId", "") // We can use the same db in yds and ydb + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto request = ::NFq::TListConnectionsBuilder().Build(); + auto result = client + .ListConnections(request, CreateFqSettings<TListConnectionsSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().connection().size(), conns); + size_t i = 0; + auto res = result.GetResult(); + auto* conns = res.mutable_connection(); + std::sort(conns->begin(), conns->end(), [&](const auto& lhs, const auto& rhs) { + return lhs.content().name() < rhs.content().name(); + }); + for (const auto& conn : *conns) { + const auto& content = conn.content(); + const auto& meta = conn.meta(); + UNIT_ASSERT_VALUES_EQUAL(content.name(), "testdb" + ToString(i)); + UNIT_ASSERT_VALUES_EQUAL(meta.created_by(), "root@builtin"); + UNIT_ASSERT_VALUES_EQUAL(meta.modified_by(), "root@builtin"); + if (i < 2) { + UNIT_ASSERT_C(content.setting().has_ydb_database(), content); + } else { + UNIT_ASSERT_C(content.setting().has_data_streams(), content); + } + i++; + } + } + } + + Y_UNIT_TEST(ListConnectionsOnEmptyConnectionsTable) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + + { + const auto request = ::NFq::TListConnectionsBuilder().Build(); + auto result = client + .ListConnections(request, CreateFqSettings<TListConnectionsSettings>("WTF")) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT(result.GetResult().connection().empty()); + } + } + + Y_UNIT_TEST(ModifyConnections) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + TString userId = "root"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + const auto folderId = TString(__func__) + "folder_id"; + TString conId; + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("created_conn") + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + conId = result.GetResult().connection_id(); + } + + {//Modify + const auto request = ::NFq::TModifyConnectionBuilder() + .SetName("modified_name") + .SetConnectionId(conId) + .CreateYdb("new ydb", "") + .SetDescription("Modified") + .Build(); + const auto result = client + .ModifyConnection(request, CreateFqSettings<TModifyConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto request = ::NFq::TDescribeConnectionBuilder() + .SetConnectionId(conId) + .Build(); + auto result = client + .DescribeConnection(request, CreateFqSettings<TDescribeConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto& res = result.GetResult().connection(); + UNIT_ASSERT_VALUES_EQUAL(res.meta().id(), conId); + UNIT_ASSERT_VALUES_EQUAL(res.meta().created_by(), "root@builtin"); + UNIT_ASSERT_VALUES_EQUAL(res.meta().modified_by(), "root@builtin"); + UNIT_ASSERT_VALUES_EQUAL(res.content().name(), "modified_name"); + UNIT_ASSERT_VALUES_EQUAL(res.content().description(), "Modified"); + } + } + + Y_UNIT_TEST(DeleteConnections) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const auto folderId = TString(__func__) + "folder_id"; + TString conId; + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("created_conn") + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + conId = result.GetResult().connection_id(); + } + + { + const auto request = ::NFq::TDeleteConnectionBuilder() + .SetConnectionId(conId) + .Build(); + + const auto result = client + .DeleteConnection(request, CreateFqSettings<TDeleteConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(Create_And_Modify_The_Same_Connection) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + TString conId; + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName("created_conn") + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + conId = result.GetResult().connection_id(); + } + + { + const auto request = ::NFq::TModifyConnectionBuilder() + .SetConnectionId(conId) + .CreateYdb("modified_db", "")//TODO remove + .Build(); + const auto result = client + .ModifyConnection(request, CreateFqSettings<TModifyConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(CreateConnection_With_Existing_Name) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + auto name = TString(__func__) + "_name"; + name.to_lower(); + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName(name) + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName(name) + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::BAD_REQUEST, result.GetIssues().ToString()); //TODO status should be ALREADY_EXISTS + } + } + + Y_UNIT_TEST(CreateConnections_With_Idempotency) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + const auto name = "connection_name"; + const TString idempotencyKey = "idempotency_key"; + TString conId; + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName(name) + .SetIdempotencyKey(idempotencyKey) + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + conId = result.GetResult().connection_id(); + } + + { + const auto request = ::NFq::TCreateConnectionBuilder() + .SetName(name) + .SetIdempotencyKey(idempotencyKey) + .CreateYdb("created_db", "") + .Build(); + const auto result = client + .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(conId, result.GetResult().connection_id()); + } + } + + Y_UNIT_TEST(CreateQuery_With_Idempotency) {//TODO Fix + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + const TString idempotencyKey = "idempotency_key"; + const TString yqlText = "select 1"; + TString queryId; + const auto request = ::NFq::TCreateQueryBuilder{} + .SetText(yqlText) + .SetIdempotencyKey(idempotencyKey) + .Build(); + + { + auto result = client.CreateQuery( + request, CreateFqSettings<TCreateQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + queryId = result.GetResult().query_id(); + } + + { + const auto req = ::NFq::TDescribeQueryBuilder{} + .SetQueryId(queryId) + .Build(); + const auto result = DoWithRetryOnRetCode([&]() { + auto result = client.DescribeQuery( + req, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto status = result.GetResult().query().meta().status(); + PrintProtoIssues(result.GetResult().query().issue()); + return status == FederatedQuery::QueryMeta::COMPLETED; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + { + auto result = client.CreateQuery( + request, CreateFqSettings<TCreateQuerySettings>(folderId)) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(queryId, result.GetResult().query_id()); + } + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + } + + // use fork for data test due to ch initialization problem + SIMPLE_UNIT_FORKED_TEST(CreateQuery_Without_Connection) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const TString yqlText = "select count(*) from testdbWTF.`connections`"; + CreateNewHistoryAndWaitFinish("folder_id_WTF", client, + yqlText, FederatedQuery::QueryMeta::FAILED); + } + + Y_UNIT_TEST(DeleteQuery) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + const TString yqlText = "select 1"; + const TString queryId = CreateNewHistoryAndWaitFinish(folderId, client, + yqlText, FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + + { + const auto request = ::NFq::TDeleteQueryBuilder() + .SetQueryId(queryId) + .Build(); + auto result = client + .DeleteQuery(request, CreateFqSettings<TDeleteQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId(queryId) + .Build(); + auto result = client + .DescribeQuery(request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString()); + } + } + + Y_UNIT_TEST(ModifyQuery) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + TString userToken = "root@builtin"; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken)); + NYdb::NFq::TClient client(driver); + + const auto folderId = TString(__func__) + "folder_id"; + const TString yqlText = "select 1"; + const TString queryId = CreateNewHistoryAndWaitFinish(folderId, client, + yqlText, FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + + { + const auto request = ::NFq::TModifyQueryBuilder() + .SetQueryId(queryId) + .SetName("ModifiedName") + .SetDescription("OK") + .Build(); + auto result = client + .ModifyQuery(request, CreateFqSettings<TModifyQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId(queryId) + .Build(); + auto result = client + .DescribeQuery(request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto& query = result.GetResult().query(); + UNIT_ASSERT_VALUES_EQUAL(query.content().name(), "ModifiedName"); + UNIT_ASSERT_VALUES_EQUAL(query.content().description(), "OK"); + } + } + + Y_UNIT_TEST(DescribeJob) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const auto folderId = "some_folder_id"; + const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + TString jobId; + + { + auto request = ::NFq::TListJobsBuilder{}.SetQueryId(queryId).Build(); + auto result = DoWithRetryOnRetCode([&]() { + auto result = client.ListJobs( + request, CreateFqSettings<TListJobsSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job_size(), 1); + jobId = result.GetResult().job(0).meta().id(); + return result.GetStatus() == EStatus::SUCCESS; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + + { + const auto request = ::NFq::TDescribeJobBuilder() + .SetJobId(jobId) + .Build(); + auto result = client.DescribeJob( + request, CreateFqSettings<TDescribeJobSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().query_meta().common().id(), queryId); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().meta().id(), jobId); + UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().query_name(), "test_query_name_1"); + } + } + + Y_UNIT_TEST(DescribeQuery) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const auto folderId = "some_folder_id"; + const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED); + CheckGetResultData(client, queryId, folderId, 1, 1, 1); + TString jobId; + + { + const auto request = ::NFq::TDescribeQueryBuilder() + .SetQueryId(queryId) + .Build(); + auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto query = result.GetResult().query(); + UNIT_ASSERT_VALUES_EQUAL(FederatedQuery::QueryMeta::ComputeStatus_Name(query.meta().status()), FederatedQuery::QueryMeta::ComputeStatus_Name(FederatedQuery::QueryMeta::COMPLETED)); + UNIT_ASSERT_VALUES_EQUAL(query.content().text(), "select 1"); + UNIT_ASSERT_VALUES_EQUAL(query.content().name(), "test_query_name_1"); + } + } +} + +Y_UNIT_TEST_SUITE(Yq_2) { + // use fork for data test due to ch initialization problem + Y_UNIT_TEST(Test_HostNameTrasformation) { + UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("rc1c-p5waby2y5y1kb5ue.mdb.yandexcloud.net"), "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443"); + UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("xxx.xxx"), "xxx.db.yandex.net:8443"); + UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("host."), "host.db.yandex.net:8443"); + } + + SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + NYdb::NFq::TClient client(driver); + const auto folderId = TString(__func__) + "folder_id"; + + { + auto request = ::NFq::TCreateConnectionBuilder{} + .SetName("testdb00") + .CreateYdb("Root", location, "") + .Build(); + + auto result = client.CreateConnection( + request, CreateFqSettings<TCreateConnectionSettings>(folderId)) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + TString queryId; + { + auto request = ::NFq::TCreateQueryBuilder{} + .SetText("select count(*) from testdb00.`yq/connections`") + .Build(); + auto result = client.CreateQuery( + request, CreateFqSettings<TCreateQuerySettings>(folderId)) + .ExtractValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + queryId = result.GetResult().query_id(); + } + + { + auto request = ::NFq::TDescribeQueryBuilder{}.SetQueryId(queryId).Build(); + auto result = DoWithRetryOnRetCode([&]() { + auto result = client.DescribeQuery( + request, CreateFqSettings<TDescribeQuerySettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + const auto status = result.GetResult().query().meta().status(); + PrintProtoIssues(result.GetResult().query().issue()); + return status == FederatedQuery::QueryMeta::COMPLETED; + }, TRetryOptions(10)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + + { + auto request = ::NFq::TGetResultDataBuilder{}.SetQueryId(queryId).Build(); + auto result = client.GetResultData( + request, CreateFqSettings<TGetResultDataSettings>(folderId)) + .ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + const auto& resultSet = result.GetResult().result_set(); + UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(0).uint64_value(), 1); + } + } +} + +Y_UNIT_TEST_SUITE(PrivateApi) { + Y_UNIT_TEST(PingTask) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + ::NFq::TPrivateClient client(driver); + const TString historyId = "id"; + const TString folderId = "folder_id"; + const TScope scope(folderId); + { + Fq::Private::PingTaskRequest req; + req.mutable_query_id()->set_value("id"); + req.set_scope(scope.ToString()); + req.set_owner_id("some_owner"); + req.set_status(FederatedQuery::QueryMeta::COMPLETED); + auto result = client.PingTask(std::move(req)).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); + } + } + + Y_UNIT_TEST(GetTask) {//PendingFetcher can take task first + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + ::NFq::TPrivateClient client(driver); + { + Fq::Private::GetTaskRequest req; + req.set_owner_id("owner_id"); + req.set_host("host"); + auto result = client.GetTask(std::move(req)).ExtractValueSync(); + result.GetIssues().PrintTo(Cerr); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + result.GetIssues().PrintTo(Cerr); + } + } + + Y_UNIT_TEST(Nodes) { + TKikimrWithGrpcAndRootSchema server({}, {}, {}, true); + ui16 grpc = server.GetPort(); + TString location = TStringBuilder() << "localhost:" << grpc; + auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); + ::NFq::TPrivateClient client(driver); + const auto instanceId = CreateGuidAsString(); + { + Fq::Private::NodesHealthCheckRequest req; + req.set_tenant("Tenant"); + auto& node = *req.mutable_node(); + node.set_hostname("hostname"); + node.set_node_id(100500); + node.set_instance_id(instanceId); + const auto result = DoWithRetryOnRetCode([&]() { + auto r = req; + auto result = client.NodesHealthCheck(std::move(r)).ExtractValueSync(); + if (result.GetStatus() == EStatus::SUCCESS) { + const auto& res = result.GetResult(); + UNIT_ASSERT(!res.nodes().empty()); + UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).hostname(), "hostname"); + UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).node_id(), 100500); + UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).instance_id(), instanceId); + } + // result.GetIssues().PrintTo(Cerr); + return result.GetStatus() == EStatus::SUCCESS; + }, TRetryOptions(Retries)); + UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit"); + } + } +} diff --git a/ydb/services/fq/ut_integration/ut_utils.cpp b/ydb/services/fq/ut_integration/ut_utils.cpp new file mode 100644 index 00000000000..ccbe7b1998f --- /dev/null +++ b/ydb/services/fq/ut_integration/ut_utils.cpp @@ -0,0 +1,92 @@ +#include "ut_utils.h" + +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <ydb/public/lib/fq/scope.h> +#include <ydb/core/yq/libs/actors/proxy.h> +#include <ydb/core/yq/libs/events/events.h> + +#include <library/cpp/time_provider/time_provider.h> +#include <library/cpp/testing/common/env.h> +#include <library/cpp/protobuf/util/pb_io.h> +#include <library/cpp/actors/testlib/test_runtime.h> + +#include <ydb/library/yql/utils/bind_in_range.h> + +#include <util/system/file.h> +#include <util/stream/str.h> +#include <util/string/printf.h> +#include <util/string/builder.h> + +#include <library/cpp/protobuf/json/proto2json.h> + +using namespace NYdb; +using namespace NYdb::NFq; +using namespace NActors; + +void UpsertToExistingTable(TDriver& driver, const TString& location){ + Y_UNUSED(location); + const TString tablePrefix = "Root/yq"; + NYdb::NTable::TClientSettings settings; + NYdb::NTable::TTableClient client(driver, settings); + auto sessionOp = client.CreateSession().ExtractValueSync(); + if (!sessionOp.IsSuccess()) { + ythrow yexception() << sessionOp.GetStatus() << "\n" << sessionOp.GetIssues().ToString(); + } + auto session = sessionOp.GetSession(); + auto timeProvider = CreateDefaultTimeProvider(); + auto now = timeProvider->Now(); + NYdb::TParamsBuilder paramsBuilder; + + paramsBuilder.AddParam("$now").Timestamp(now).Build(); + auto params = paramsBuilder.Build(); + + const TString scope = TScope("some_folder_id").ToString(); + + { + auto result = session.ExecuteSchemeQuery( + Sprintf(R"___( + --!syntax_v1 + CREATE TABLE `%s/%s` ( + id String, + PRIMARY KEY (id) + ); + )___", tablePrefix.c_str(), "empty_table") + ).ExtractValueSync(); + if (!result.IsSuccess()) { + ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString(); + } + } + + { + NYdb::TParamsBuilder paramsBuilder; + auto result = session.ExecuteDataQuery( + Sprintf(R"___( + --!syntax_v1 + upsert into `%s/empty_table` + (id) + values + ("test_id1"), ("test_id2"); + )___", tablePrefix.c_str()), + NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), + paramsBuilder.Build(), + NYdb::NTable::TExecDataQuerySettings()).ExtractValueSync(); + if (!result.IsSuccess()) { + ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString(); + } + } + + { + NYdb::TParamsBuilder paramsBuilder; + auto result = session.ExecuteDataQuery( + Sprintf(R"___( + --!syntax_v1 + delete from `%s/empty_table`; + )___", tablePrefix.c_str()), + NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(), + paramsBuilder.Build(), + NYdb::NTable::TExecDataQuerySettings()).ExtractValueSync(); + if (!result.IsSuccess()) { + ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString(); + } + } +} diff --git a/ydb/services/fq/ut_integration/ut_utils.h b/ydb/services/fq/ut_integration/ut_utils.h new file mode 100644 index 00000000000..a627c45929c --- /dev/null +++ b/ydb/services/fq/ut_integration/ut_utils.h @@ -0,0 +1,9 @@ +#pragma once + +#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> +#include <util/system/shellcommand.h> +#include <contrib/libs/curl/include/curl/curl.h> +#include <library/cpp/json/json_reader.h> + +void UpsertToExistingTable(NYdb::TDriver& driver, const TString& location); + diff --git a/ydb/services/yq/grpc_service.cpp b/ydb/services/yq/grpc_service.cpp index a7505e18d0d..c198b1c992c 100644 --- a/ydb/services/yq/grpc_service.cpp +++ b/ydb/services/yq/grpc_service.cpp @@ -256,20 +256,20 @@ MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Respon ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions) ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions) ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions) - ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions) - ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions) - ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions) - ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions) - ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions) - ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions) - ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions) - ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions) - ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions) - ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions) - ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions) - ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions) - ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions) - ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions) + ADD_REQUEST(GetResultData, DoYandexQueryGetResultDataRequest, GetResultDataPermissions) + ADD_REQUEST(ListJobs, DoYandexQueryListJobsRequest, ListJobsPermissions) + ADD_REQUEST(DescribeJob, DoYandexQueryDescribeJobRequest, DescribeJobPermissions) + ADD_REQUEST(CreateConnection, DoYandexQueryCreateConnectionRequest, CreateConnectionPermissions) + ADD_REQUEST(ListConnections, DoYandexQueryListConnectionsRequest, ListConnectionsPermissions) + ADD_REQUEST(DescribeConnection, DoYandexQueryDescribeConnectionRequest, DescribeConnectionPermissions) + ADD_REQUEST(ModifyConnection, DoYandexQueryModifyConnectionRequest, ModifyConnectionPermissions) + ADD_REQUEST(DeleteConnection, DoYandexQueryDeleteConnectionRequest, DeleteConnectionPermissions) + ADD_REQUEST(TestConnection, DoYandexQueryTestConnectionRequest, TestConnectionPermissions) + ADD_REQUEST(CreateBinding, DoYandexQueryCreateBindingRequest, CreateBindingPermissions) + ADD_REQUEST(ListBindings, DoYandexQueryListBindingsRequest, ListBindingsPermissions) + ADD_REQUEST(DescribeBinding, DoYandexQueryDescribeBindingRequest, DescribeBindingPermissions) + ADD_REQUEST(ModifyBinding, DoYandexQueryModifyBindingRequest, ModifyBindingPermissions) + ADD_REQUEST(DeleteBinding, DoYandexQueryDeleteBindingRequest, DeleteBindingPermissions) #undef ADD_REQUEST |