aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authord-mokhnatkin <d-mokhnatkin@ydb.tech>2022-07-18 21:13:36 +0300
committerd-mokhnatkin <d-mokhnatkin@ydb.tech>2022-07-18 21:13:36 +0300
commit3041f85f9fa874cd45bf94d033fc4a95b3faf37f (patch)
treebab750c2de93eb51761e453869bdf6d25fe243a1
parent0a3c71dcf8547728a97969eb913292de315aebb9 (diff)
downloadydb-3041f85f9fa874cd45bf94d033fc4a95b3faf37f.tar.gz
add fq service
-rw-r--r--CMakeLists.darwin.txt3
-rw-r--r--CMakeLists.linux.txt3
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.txt1
-rw-r--r--ydb/core/driver_lib/run/run.cpp25
-rw-r--r--ydb/core/grpc_services/CMakeLists.txt2
-rw-r--r--ydb/core/grpc_services/rpc_fq.cpp444
-rw-r--r--ydb/core/grpc_services/rpc_fq_internal.cpp131
-rw-r--r--ydb/core/grpc_services/rpc_yq.cpp30
-rw-r--r--ydb/core/grpc_services/service_fq.h90
-rw-r--r--ydb/core/grpc_services/service_fq_internal.h17
-rw-r--r--ydb/core/grpc_services/service_yq.h28
-rw-r--r--ydb/core/testlib/CMakeLists.txt3
-rw-r--r--ydb/core/testlib/test_client.cpp4
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders.h184
-rw-r--r--ydb/core/yq/libs/control_plane_storage/message_builders_yq.h1305
-rw-r--r--ydb/core/yq/libs/private_client/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/private_client/private_client_fq.cpp182
-rw-r--r--ydb/core/yq/libs/private_client/private_client_fq.h82
-rw-r--r--ydb/core/yq/libs/protos/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/protos/fq_private.proto168
-rw-r--r--ydb/public/api/grpc/CMakeLists.txt3
-rw-r--r--ydb/public/api/grpc/draft/CMakeLists.txt1
-rw-r--r--ydb/public/api/grpc/draft/yql_db_v1_fq.proto19
-rw-r--r--ydb/public/api/grpc/fq_v1.proto79
-rw-r--r--ydb/public/api/protos/CMakeLists.txt1
-rw-r--r--ydb/public/api/protos/fq.proto743
-rw-r--r--ydb/public/api/protos/yq.proto2
-rw-r--r--ydb/public/lib/fq/CMakeLists.txt21
-rw-r--r--ydb/public/lib/fq/fq.cpp754
-rw-r--r--ydb/public/lib/fq/fq.h215
-rw-r--r--ydb/public/lib/fq/helpers.h25
-rw-r--r--ydb/public/lib/fq/scope.cpp9
-rw-r--r--ydb/public/lib/fq/scope.h36
-rw-r--r--ydb/services/fq/CMakeLists.txt24
-rw-r--r--ydb/services/fq/grpc_service.cpp278
-rw-r--r--ydb/services/fq/grpc_service.h36
-rw-r--r--ydb/services/fq/private_grpc.cpp66
-rw-r--r--ydb/services/fq/private_grpc.h34
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.darwin.txt61
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.linux.txt65
-rw-r--r--ydb/services/fq/ut_integration/CMakeLists.txt13
-rw-r--r--ydb/services/fq/ut_integration/fq_ut.cpp998
-rw-r--r--ydb/services/fq/ut_integration/ut_utils.cpp92
-rw-r--r--ydb/services/fq/ut_integration/ut_utils.h9
-rw-r--r--ydb/services/yq/grpc_service.cpp28
45 files changed, 6172 insertions, 144 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index d6157ca7859..660a32092f2 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -911,6 +911,7 @@ add_subdirectory(ydb/core/yq/libs/logs)
add_subdirectory(ydb/services/auth)
add_subdirectory(ydb/services/cms)
add_subdirectory(ydb/services/discovery)
+add_subdirectory(ydb/services/fq)
add_subdirectory(ydb/services/kesus)
add_subdirectory(ydb/services/local_discovery)
add_subdirectory(ydb/services/monitoring)
@@ -1081,9 +1082,11 @@ add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/library/protobuf_printer/ut)
add_subdirectory(ydb/library/schlab/ut)
add_subdirectory(ydb/library/security/ut)
+add_subdirectory(ydb/public/lib/fq)
add_subdirectory(ydb/public/lib/idx_test)
add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
+add_subdirectory(ydb/services/fq/ut_integration)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index faad06f7475..e28e5cc6605 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -934,6 +934,7 @@ add_subdirectory(ydb/core/yq/libs/logs)
add_subdirectory(ydb/services/auth)
add_subdirectory(ydb/services/cms)
add_subdirectory(ydb/services/discovery)
+add_subdirectory(ydb/services/fq)
add_subdirectory(ydb/services/kesus)
add_subdirectory(ydb/services/local_discovery)
add_subdirectory(ydb/services/monitoring)
@@ -1102,9 +1103,11 @@ add_subdirectory(ydb/library/pretty_types_print/wilson)
add_subdirectory(ydb/library/protobuf_printer/ut)
add_subdirectory(ydb/library/schlab/ut)
add_subdirectory(ydb/library/security/ut)
+add_subdirectory(ydb/public/lib/fq)
add_subdirectory(ydb/public/lib/idx_test)
add_subdirectory(ydb/services/cms/ut)
add_subdirectory(ydb/services/datastreams/ut)
+add_subdirectory(ydb/services/fq/ut_integration)
add_subdirectory(ydb/services/persqueue_cluster_discovery/ut)
add_subdirectory(ydb/services/persqueue_v1/ut)
add_subdirectory(ydb/public/sdk/cpp/client/ydb_topic)
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt
index d2a0b6d61d7..66c53f7ff6a 100644
--- a/ydb/core/driver_lib/run/CMakeLists.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.txt
@@ -117,6 +117,7 @@ target_link_libraries(run PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ ydb-services-fq
ydb-services-kesus
ydb-services-local_discovery
ydb-services-monitoring
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp
index f9cf13dc1ef..c063e630c05 100644
--- a/ydb/core/driver_lib/run/run.cpp
+++ b/ydb/core/driver_lib/run/run.cpp
@@ -78,31 +78,33 @@
#include <library/cpp/grpc/server/actors/logger.h>
-#include <ydb/services/yq/private_grpc.h>
+#include <ydb/services/auth/grpc_service.h>
#include <ydb/services/cms/grpc_service.h>
#include <ydb/services/datastreams/grpc_service.h>
+#include <ydb/services/discovery/grpc_service.h>
+#include <ydb/services/fq/grpc_service.h>
+#include <ydb/services/fq/private_grpc.h>
#include <ydb/services/kesus/grpc_service.h>
+#include <ydb/services/local_discovery/grpc_service.h>
#include <ydb/services/monitoring/grpc_service.h>
-#include <ydb/services/auth/grpc_service.h>
+#include <ydb/services/persqueue_cluster_discovery/grpc_service.h>
+#include <ydb/services/persqueue_v1/persqueue.h>
+#include <ydb/services/persqueue_v1/topic.h>
+#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/ydb/ydb_clickhouse_internal.h>
#include <ydb/services/ydb/ydb_dummy.h>
#include <ydb/services/ydb/ydb_experimental.h>
#include <ydb/services/ydb/ydb_export.h>
#include <ydb/services/ydb/ydb_import.h>
+#include <ydb/services/ydb/ydb_logstore.h>
+#include <ydb/services/ydb/ydb_long_tx.h>
#include <ydb/services/ydb/ydb_operation.h>
#include <ydb/services/ydb/ydb_s3_internal.h>
#include <ydb/services/ydb/ydb_scheme.h>
#include <ydb/services/ydb/ydb_scripting.h>
#include <ydb/services/ydb/ydb_table.h>
-#include <ydb/services/ydb/ydb_long_tx.h>
-#include <ydb/services/ydb/ydb_logstore.h>
-#include <ydb/services/persqueue_cluster_discovery/grpc_service.h>
-#include <ydb/services/persqueue_v1/persqueue.h>
-#include <ydb/services/persqueue_v1/topic.h>
-#include <ydb/services/rate_limiter/grpc_service.h>
-#include <ydb/services/discovery/grpc_service.h>
-#include <ydb/services/local_discovery/grpc_service.h>
#include <ydb/services/yq/grpc_service.h>
+#include <ydb/services/yq/private_grpc.h>
#include <ydb/core/yq/libs/init/init.h>
@@ -737,10 +739,13 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
if (hasYandexQuery) {
server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId));
// TODO: REMOVE next line after migration to "yq_private"
server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
} /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) {
server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
+ server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId));
}
if (hasLogStore) {
diff --git a/ydb/core/grpc_services/CMakeLists.txt b/ydb/core/grpc_services/CMakeLists.txt
index 658e23468b2..90ffceb87cc 100644
--- a/ydb/core/grpc_services/CMakeLists.txt
+++ b/ydb/core/grpc_services/CMakeLists.txt
@@ -85,6 +85,8 @@ target_sources(ydb-core-grpc_services PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_yql_script.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_explain_data_query.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_forget_operation.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_fq_internal.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_fq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_get_operation.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_get_shard_locations.cpp
${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_import.cpp
diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp
new file mode 100644
index 00000000000..32fff6eecc4
--- /dev/null
+++ b/ydb/core/grpc_services/rpc_fq.cpp
@@ -0,0 +1,444 @@
+#include "rpc_common.h"
+#include "rpc_deferrable.h"
+
+#include <ydb/core/grpc_services/service_fq.h>
+#include <ydb/core/yq/libs/audit/events/events.h>
+#include <ydb/core/yq/libs/audit/yq_audit_service.h>
+#include <ydb/core/yq/libs/control_plane_proxy/control_plane_proxy.h>
+#include <ydb/core/yq/libs/control_plane_proxy/events/events.h>
+#include <ydb/core/yq/libs/control_plane_proxy/utils.h>
+#include <ydb/public/api/protos/fq.pb.h>
+
+#include <ydb/library/aclib/aclib.h>
+
+#include <library/cpp/actors/core/hfunc.h>
+
+#include <util/generic/guid.h>
+#include <util/string/split.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+using namespace Ydb;
+
+template <typename RpcRequestType, typename EvRequestType, typename EvResponseType, typename CastRequest, typename CastResult>
+class TFederatedQueryRequestRPC : public TRpcOperationRequestActor<
+ TFederatedQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType, CastRequest, CastResult>, RpcRequestType> {
+
+public:
+ using TBase = TRpcOperationRequestActor<
+ TFederatedQueryRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>,
+ RpcRequestType>;
+ using TBase::Become;
+ using TBase::Send;
+ using TBase::PassAway;
+ using TBase::Request_;
+ using TBase::GetProtoRequest;
+
+protected:
+ TString Token;
+ TString FolderId;
+ TString User;
+ TString PeerName;
+ TString UserAgent;
+ TString RequestId;
+
+public:
+ TFederatedQueryRequestRPC(IRequestOpCtx* request)
+ : TBase(request) {}
+
+ void Bootstrap() {
+ auto requestCtx = Request_.get();
+
+ auto request = dynamic_cast<RpcRequestType*>(requestCtx);
+ Y_VERIFY(request);
+
+ auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(requestCtx);
+ Y_VERIFY(proxyCtx);
+
+ PeerName = Request_->GetPeerName();
+ UserAgent = Request_->GetPeerMetaValues("user-agent").GetOrElse("empty");
+ RequestId = Request_->GetPeerMetaValues("x-request-id").GetOrElse(CreateGuidAsString());
+
+ TMaybe<TString> authToken = proxyCtx->GetYdbToken();
+ if (!authToken) {
+ ReplyWithStatus("Token is empty", StatusIds::BAD_REQUEST);
+ return;
+ }
+ Token = *authToken;
+
+ TString ydbProject = Request_->GetPeerMetaValues("x-ydb-fq-project").GetOrElse("");
+
+ if (!ydbProject.StartsWith("yandexcloud://")) {
+ ReplyWithStatus("x-ydb-fq-project should start with yandexcloud:// but got " + ydbProject, StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ const TVector<TString> path = StringSplitter(ydbProject).Split('/').SkipEmpty();
+ if (path.size() != 2 && path.size() != 3) {
+ ReplyWithStatus("x-ydb-fq-project format is invalid. Must be yandexcloud://folder_id, but got " + ydbProject, StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ FolderId = path.back();
+ if (!FolderId) {
+ ReplyWithStatus("Folder id is empty", StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ if (FolderId.length() > 1024) {
+ ReplyWithStatus("Folder id length greater than 1024 characters: " + FolderId, StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ const TString& internalToken = proxyCtx->GetInternalToken();
+ TVector<TString> permissions;
+ if (internalToken) {
+ NACLib::TUserToken userToken(internalToken);
+ User = userToken.GetUserSID();
+ for (const auto& sid: request->Sids) {
+ if (userToken.IsExist(sid)) {
+ permissions.push_back(sid);
+ }
+ }
+ }
+
+ if (!User) {
+ ReplyWithStatus("Authorization error. Permission denied", StatusIds::UNAUTHORIZED);
+ return;
+ }
+
+ const auto* req = GetProtoRequest();
+ TProtoStringType protoString;
+ if (!req->SerializeToString(&protoString)) {
+ ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ auto castedRequest = CastRequest();
+ if (!castedRequest.ParseFromString(protoString)) {
+ ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ auto ev = MakeHolder<EvRequestType>(FolderId, castedRequest, User, Token, permissions);
+ Send(NYq::ControlPlaneProxyActorId(), ev.Release());
+ Become(&TFederatedQueryRequestRPC<RpcRequestType, EvRequestType, EvResponseType, CastRequest, CastResult>::StateFunc);
+ }
+
+protected:
+ void ReplyWithStatus(const TString& issueMessage, StatusIds::StatusCode status) {
+ Request_->RaiseIssue(NYql::TIssue(issueMessage));
+ Request_->ReplyWithYdbStatus(status);
+ PassAway();
+ }
+
+ STRICT_STFUNC(StateFunc,
+ hFunc(EvResponseType, Handle);
+ )
+
+ template <typename TResponse, typename TReq>
+ void SendResponse(const TResponse& response, TReq& req) {
+ if (response.Issues) {
+ req.RaiseIssues(response.Issues);
+ req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST);
+ } else {
+ TProtoStringType protoString;
+ if (!response.Result.SerializeToString(&protoString)) {
+ ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ auto castedResult = CastResult();
+ if (!castedResult.ParseFromString(protoString)) {
+ ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ req.SendResult(castedResult, StatusIds::SUCCESS);
+ }
+ }
+
+ template <typename TResponse, typename TReq> requires requires (TResponse r) { r.AuditDetails; }
+ void SendResponse(const TResponse& response, TReq& req) {
+ if (response.Issues) {
+ req.RaiseIssues(response.Issues);
+ req.ReplyWithYdbStatus(StatusIds::BAD_REQUEST);
+ } else {
+ TProtoStringType protoString;
+ if (!response.Result.SerializeToString(&protoString)) {
+ ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ auto castedResponse = CastResult();
+ if (!castedResponse.ParseFromString(protoString)) {
+ ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ req.SendResult(castedResponse, StatusIds::SUCCESS);
+ }
+
+ NYq::TEvAuditService::TExtraInfo extraInfo{
+ .Token = Token,
+ .FolderId = FolderId,
+ .User = User,
+ .PeerName = PeerName,
+ .UserAgent = UserAgent,
+ .RequestId = RequestId,
+ };
+
+ const auto* protoReq = GetProtoRequest();
+ TProtoStringType protoString;
+ if (!protoReq->SerializeToString(&protoString)) {
+ ReplyWithStatus("Can't serialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+ auto castedProtoRequest = CastRequest();
+ if (!castedProtoRequest.ParseFromString(protoString)) {
+ ReplyWithStatus("Can't deserialize proto", StatusIds::BAD_REQUEST);
+ return;
+ }
+
+ Send(NYq::YqAuditServiceActorId(), NYq::TEvAuditService::MakeAuditEvent(
+ std::move(extraInfo),
+ castedProtoRequest,
+ response.Issues,
+ response.AuditDetails));
+ }
+
+ void Handle(typename EvResponseType::TPtr& ev) {
+ SendResponse(*ev->Get(), *Request_);
+ PassAway();
+ }
+};
+
+using TFederatedQueryCreateQueryRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::CreateQueryRequest, FederatedQuery::CreateQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateQueryResponse,
+ YandexQuery::CreateQueryRequest,
+ FederatedQuery::CreateQueryResult>;
+
+void DoFederatedQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryCreateQueryRPC(p.release()));
+}
+
+using TFederatedQueryListQueriesRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ListQueriesRequest, FederatedQuery::ListQueriesResponse>,
+ NYq::TEvControlPlaneProxy::TEvListQueriesRequest,
+ NYq::TEvControlPlaneProxy::TEvListQueriesResponse,
+ YandexQuery::ListQueriesRequest,
+ FederatedQuery::ListQueriesResult>;
+
+void DoFederatedQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryListQueriesRPC(p.release()));
+}
+
+using TFederatedQueryDescribeQueryRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DescribeQueryRequest, FederatedQuery::DescribeQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeQueryResponse,
+ YandexQuery::DescribeQueryRequest,
+ FederatedQuery::DescribeQueryResult>;
+
+void DoFederatedQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeQueryRPC(p.release()));
+}
+
+using TFederatedQueryGetQueryStatusRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::GetQueryStatusRequest, FederatedQuery::GetQueryStatusResponse>,
+ NYq::TEvControlPlaneProxy::TEvGetQueryStatusRequest,
+ NYq::TEvControlPlaneProxy::TEvGetQueryStatusResponse,
+ YandexQuery::GetQueryStatusRequest,
+ FederatedQuery::GetQueryStatusResult>;
+
+void DoFederatedQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryGetQueryStatusRPC(p.release()));
+}
+
+using TFederatedQueryModifyQueryRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ModifyQueryRequest, FederatedQuery::ModifyQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyQueryResponse,
+ YandexQuery::ModifyQueryRequest,
+ FederatedQuery::ModifyQueryResult>;
+
+void DoFederatedQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryModifyQueryRPC(p.release()));
+}
+
+using TFederatedQueryDeleteQueryRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DeleteQueryRequest, FederatedQuery::DeleteQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteQueryResponse,
+ YandexQuery::DeleteQueryRequest,
+ FederatedQuery::DeleteQueryResult>;
+
+void DoFederatedQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteQueryRPC(p.release()));
+}
+
+using TFederatedQueryControlQueryRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ControlQueryRequest, FederatedQuery::ControlQueryResponse>,
+ NYq::TEvControlPlaneProxy::TEvControlQueryRequest,
+ NYq::TEvControlPlaneProxy::TEvControlQueryResponse,
+ YandexQuery::ControlQueryRequest,
+ FederatedQuery::ControlQueryResult>;
+
+void DoFederatedQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryControlQueryRPC(p.release()));
+}
+
+using TFederatedQueryGetResultDataRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::GetResultDataRequest, FederatedQuery::GetResultDataResponse>,
+ NYq::TEvControlPlaneProxy::TEvGetResultDataRequest,
+ NYq::TEvControlPlaneProxy::TEvGetResultDataResponse,
+ YandexQuery::GetResultDataRequest,
+ FederatedQuery::GetResultDataResult>;
+
+void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryGetResultDataRPC(p.release()));
+}
+
+using TFederatedQueryListJobsRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ListJobsRequest, FederatedQuery::ListJobsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListJobsRequest,
+ NYq::TEvControlPlaneProxy::TEvListJobsResponse,
+ YandexQuery::ListJobsRequest,
+ FederatedQuery::ListJobsResult>;
+
+void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryListJobsRPC(p.release()));
+}
+
+using TFederatedQueryDescribeJobRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DescribeJobRequest, FederatedQuery::DescribeJobResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeJobRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeJobResponse,
+ YandexQuery::DescribeJobRequest,
+ FederatedQuery::DescribeJobResult>;
+
+void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeJobRPC(p.release()));
+}
+
+using TFederatedQueryCreateConnectionRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::CreateConnectionRequest, FederatedQuery::CreateConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse,
+ YandexQuery::CreateConnectionRequest,
+ FederatedQuery::CreateConnectionResult>;
+
+void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryCreateConnectionRPC(p.release()));
+}
+
+using TFederatedQueryListConnectionsRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ListConnectionsRequest, FederatedQuery::ListConnectionsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListConnectionsRequest,
+ NYq::TEvControlPlaneProxy::TEvListConnectionsResponse,
+ YandexQuery::ListConnectionsRequest,
+ FederatedQuery::ListConnectionsResult>;
+
+void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryListConnectionsRPC(p.release()));
+}
+
+using TFederatedQueryDescribeConnectionRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DescribeConnectionRequest, FederatedQuery::DescribeConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse,
+ YandexQuery::DescribeConnectionRequest,
+ FederatedQuery::DescribeConnectionResult>;
+
+void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeConnectionRPC(p.release()));
+}
+
+using TFederatedQueryModifyConnectionRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ModifyConnectionRequest, FederatedQuery::ModifyConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse,
+ YandexQuery::ModifyConnectionRequest,
+ FederatedQuery::ModifyConnectionResult>;
+
+void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryModifyConnectionRPC(p.release()));
+}
+
+using TFederatedQueryDeleteConnectionRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DeleteConnectionRequest, FederatedQuery::DeleteConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse,
+ YandexQuery::DeleteConnectionRequest,
+ FederatedQuery::DeleteConnectionResult>;
+
+void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteConnectionRPC(p.release()));
+}
+
+using TFederatedQueryTestConnectionRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::TestConnectionRequest, FederatedQuery::TestConnectionResponse>,
+ NYq::TEvControlPlaneProxy::TEvTestConnectionRequest,
+ NYq::TEvControlPlaneProxy::TEvTestConnectionResponse,
+ YandexQuery::TestConnectionRequest,
+ FederatedQuery::TestConnectionResult>;
+
+void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryTestConnectionRPC(p.release()));
+}
+
+using TFederatedQueryCreateBindingRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::CreateBindingRequest, FederatedQuery::CreateBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvCreateBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvCreateBindingResponse,
+ YandexQuery::CreateBindingRequest,
+ FederatedQuery::CreateBindingResult>;
+
+void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryCreateBindingRPC(p.release()));
+}
+
+using TFederatedQueryListBindingsRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ListBindingsRequest, FederatedQuery::ListBindingsResponse>,
+ NYq::TEvControlPlaneProxy::TEvListBindingsRequest,
+ NYq::TEvControlPlaneProxy::TEvListBindingsResponse,
+ YandexQuery::ListBindingsRequest,
+ FederatedQuery::ListBindingsResult>;
+
+void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryListBindingsRPC(p.release()));
+}
+
+using TFederatedQueryDescribeBindingRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DescribeBindingRequest, FederatedQuery::DescribeBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse,
+ YandexQuery::DescribeBindingRequest,
+ FederatedQuery::DescribeBindingResult>;
+
+void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDescribeBindingRPC(p.release()));
+}
+
+using TFederatedQueryModifyBindingRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::ModifyBindingRequest, FederatedQuery::ModifyBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvModifyBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvModifyBindingResponse,
+ YandexQuery::ModifyBindingRequest,
+ FederatedQuery::ModifyBindingResult>;
+
+void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryModifyBindingRPC(p.release()));
+}
+
+using TFederatedQueryDeleteBindingRPC = TFederatedQueryRequestRPC<
+ TGrpcFqRequestOperationCall<FederatedQuery::DeleteBindingRequest, FederatedQuery::DeleteBindingResponse>,
+ NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest,
+ NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse,
+ YandexQuery::DeleteBindingRequest,
+ FederatedQuery::DeleteBindingResult>;
+
+void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFederatedQueryDeleteBindingRPC(p.release()));
+}
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_fq_internal.cpp b/ydb/core/grpc_services/rpc_fq_internal.cpp
new file mode 100644
index 00000000000..b2f9c60c796
--- /dev/null
+++ b/ydb/core/grpc_services/rpc_fq_internal.cpp
@@ -0,0 +1,131 @@
+#include "service_fq_internal.h"
+#include "rpc_common.h"
+#include "rpc_deferrable.h"
+
+#include <ydb/core/yq/libs/events/events.h>
+#include <ydb/core/yq/libs/actors/proxy_private.h>
+
+#include <library/cpp/actors/core/hfunc.h>
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/yq/libs/protos/fq_private.pb.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+using TEvFqPrivatePingTaskRequest =
+ TGrpcRequestOperationCall<Fq::Private::PingTaskRequest, Fq::Private::PingTaskResponse>;
+using TEvFqPrivateGetTaskRequest =
+ TGrpcRequestOperationCall<Fq::Private::GetTaskRequest, Fq::Private::GetTaskResponse>;
+using TEvFqPrivateWriteTaskResultRequest =
+ TGrpcRequestOperationCall<Fq::Private::WriteTaskResultRequest, Fq::Private::WriteTaskResultResponse>;
+using TEvFqPrivateNodesHealthCheckRequest =
+ TGrpcRequestOperationCall<Fq::Private::NodesHealthCheckRequest, Fq::Private::NodesHealthCheckResponse>;
+
+template <typename RpcRequestType, typename EvRequestType, typename EvResponseType, typename CastRequest, typename CastResult>
+class TFqPrivateRequestRPC : public TRpcOperationRequestActor<
+ TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, RpcRequestType> {
+
+ using TBase = TRpcOperationRequestActor<
+ TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>,
+ RpcRequestType>;
+
+public:
+ TFqPrivateRequestRPC(IRequestOpCtx* request) : TBase(request) {}
+
+ void Bootstrap(const TActorContext& ctx) {
+ Y_UNUSED(ctx);
+ const auto req = this->GetProtoRequest();
+ TProtoStringType protoString;
+ Y_ENSURE(req->SerializeToString(&protoString));
+ auto castedRequest = CastRequest();
+ Y_ENSURE(castedRequest.ParseFromString(protoString));
+ auto ev = MakeHolder<EvRequestType>();
+ auto request = dynamic_cast<RpcRequestType*>(this->Request_.get());
+ Y_VERIFY(request);
+ auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(request);
+ Y_VERIFY(proxyCtx);
+ TString user;
+ const TString& internalToken = proxyCtx->GetInternalToken();
+ if (internalToken) {
+ NACLib::TUserToken userToken(internalToken);
+ user = userToken.GetUserSID();
+ }
+ ev->Record = castedRequest;
+ ev->User = user;
+ this->Send(NYq::MakeYqPrivateProxyId(), ev.Release());
+ this->Become(&TFqPrivateRequestRPC<RpcRequestType, EvRequestType, EvResponseType, CastRequest, CastResult>::StateFunc);
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ HFunc(EvResponseType, Handle);
+ )
+
+ void Handle(typename EvResponseType::TPtr& ev, const TActorContext& ctx) {
+ SendResponse(ev, *this->Request_);
+ this->Die(ctx);
+ }
+
+ template <typename TEv, typename TReq>
+ void SendResponse(const TEv& ev, TReq& req) {
+ if (!ev->Get()->Record) {
+ req.RaiseIssues(ev->Get()->Issues);
+ req.ReplyWithYdbStatus(ev->Get()->Status);
+ } else {
+ TProtoStringType protoString;
+ Y_ENSURE(ev->Get()->Record->SerializeToString(&protoString));
+ auto castedResult = CastResult();
+ Y_ENSURE(castedResult.ParseFromString(protoString));
+
+ req.SendResult(castedResult, ev->Get()->Status);
+ }
+ }
+};
+
+using TFqPrivatePingTaskRPC = TFqPrivateRequestRPC<
+ TEvFqPrivatePingTaskRequest,
+ NYq::TEvents::TEvPingTaskRequest,
+ NYq::TEvents::TEvPingTaskResponse,
+ Yq::Private::PingTaskRequest,
+ Fq::Private::PingTaskResult>;
+
+using TFqPrivateGetTaskRPC = TFqPrivateRequestRPC<
+ TEvFqPrivateGetTaskRequest,
+ NYq::TEvents::TEvGetTaskRequest,
+ NYq::TEvents::TEvGetTaskResponse,
+ Yq::Private::GetTaskRequest,
+ Fq::Private::GetTaskResult>;
+
+using TFqPrivateWriteTaskResultRPC = TFqPrivateRequestRPC<
+ TEvFqPrivateWriteTaskResultRequest,
+ NYq::TEvents::TEvWriteTaskResultRequest,
+ NYq::TEvents::TEvWriteTaskResultResponse,
+ Yq::Private::WriteTaskResultRequest,
+ Fq::Private::WriteTaskResultResult>;
+
+using TFqPrivateNodesHealthCheckRPC = TFqPrivateRequestRPC<
+ TEvFqPrivateNodesHealthCheckRequest,
+ NYq::TEvents::TEvNodesHealthCheckRequest,
+ NYq::TEvents::TEvNodesHealthCheckResponse,
+ Yq::Private::NodesHealthCheckRequest,
+ Fq::Private::NodesHealthCheckResult>;
+
+void DoFqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFqPrivatePingTaskRPC(p.release()));
+}
+
+void DoFqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFqPrivateGetTaskRPC(p.release()));
+}
+
+void DoFqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFqPrivateWriteTaskResultRPC(p.release()));
+}
+
+void DoFqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+ TActivationContext::AsActorContext().Register(new TFqPrivateNodesHealthCheckRPC(p.release()));
+}
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/core/grpc_services/rpc_yq.cpp b/ydb/core/grpc_services/rpc_yq.cpp
index 0179f36cab6..0a43bc12acb 100644
--- a/ydb/core/grpc_services/rpc_yq.cpp
+++ b/ydb/core/grpc_services/rpc_yq.cpp
@@ -1,3 +1,5 @@
+// DEPRECATED!!! use rpc_fq.cpp
+
#include "rpc_common.h"
#include "rpc_deferrable.h"
@@ -236,7 +238,7 @@ using TYandexQueryGetResultDataRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvGetResultDataRequest,
NYq::TEvControlPlaneProxy::TEvGetResultDataResponse>;
-void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryGetResultDataRPC(p.release()));
}
@@ -245,7 +247,7 @@ using TYandexQueryListJobsRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvListJobsRequest,
NYq::TEvControlPlaneProxy::TEvListJobsResponse>;
-void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryListJobsRPC(p.release()));
}
@@ -254,7 +256,7 @@ using TYandexQueryDescribeJobRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvDescribeJobRequest,
NYq::TEvControlPlaneProxy::TEvDescribeJobResponse>;
-void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryDescribeJobRPC(p.release()));
}
@@ -263,7 +265,7 @@ using TYandexQueryCreateConnectionRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvCreateConnectionRequest,
NYq::TEvControlPlaneProxy::TEvCreateConnectionResponse>;
-void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryCreateConnectionRPC(p.release()));
}
@@ -272,7 +274,7 @@ using TYandexQueryListConnectionsRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvListConnectionsRequest,
NYq::TEvControlPlaneProxy::TEvListConnectionsResponse>;
-void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryListConnectionsRPC(p.release()));
}
@@ -281,7 +283,7 @@ using TYandexQueryDescribeConnectionRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvDescribeConnectionRequest,
NYq::TEvControlPlaneProxy::TEvDescribeConnectionResponse>;
-void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryDescribeConnectionRPC(p.release()));
}
@@ -290,7 +292,7 @@ using TYandexQueryModifyConnectionRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvModifyConnectionRequest,
NYq::TEvControlPlaneProxy::TEvModifyConnectionResponse>;
-void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryModifyConnectionRPC(p.release()));
}
@@ -299,7 +301,7 @@ using TYandexQueryDeleteConnectionRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvDeleteConnectionRequest,
NYq::TEvControlPlaneProxy::TEvDeleteConnectionResponse>;
-void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryDeleteConnectionRPC(p.release()));
}
@@ -308,7 +310,7 @@ using TYandexQueryTestConnectionRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvTestConnectionRequest,
NYq::TEvControlPlaneProxy::TEvTestConnectionResponse>;
-void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryTestConnectionRPC(p.release()));
}
@@ -317,7 +319,7 @@ using TYandexQueryCreateBindingRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvCreateBindingRequest,
NYq::TEvControlPlaneProxy::TEvCreateBindingResponse>;
-void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) {
+void DoYandexQueryCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& ) {
TActivationContext::AsActorContext().Register(new TYandexQueryCreateBindingRPC(p.release()));
}
@@ -326,7 +328,7 @@ using TYandexQueryListBindingsRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvListBindingsRequest,
NYq::TEvControlPlaneProxy::TEvListBindingsResponse>;
-void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryListBindingsRPC(p.release()));
}
@@ -335,7 +337,7 @@ using TYandexQueryDescribeBindingRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvDescribeBindingRequest,
NYq::TEvControlPlaneProxy::TEvDescribeBindingResponse>;
-void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryDescribeBindingRPC(p.release()));
}
@@ -344,7 +346,7 @@ using TYandexQueryModifyBindingRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvModifyBindingRequest,
NYq::TEvControlPlaneProxy::TEvModifyBindingResponse>;
-void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryModifyBindingRPC(p.release()));
}
@@ -353,7 +355,7 @@ using TYandexQueryDeleteBindingRPC = TYandexQueryRequestRPC<
NYq::TEvControlPlaneProxy::TEvDeleteBindingRequest,
NYq::TEvControlPlaneProxy::TEvDeleteBindingResponse>;
-void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
+void DoYandexQueryDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) {
TActivationContext::AsActorContext().Register(new TYandexQueryDeleteBindingRPC(p.release()));
}
diff --git a/ydb/core/grpc_services/service_fq.h b/ydb/core/grpc_services/service_fq.h
new file mode 100644
index 00000000000..99e52fbb137
--- /dev/null
+++ b/ydb/core/grpc_services/service_fq.h
@@ -0,0 +1,90 @@
+#pragma once
+
+#include <algorithm>
+#include <memory>
+
+#include <ydb/core/base/ticket_parser.h>
+#include <ydb/core/yq/libs/control_plane_proxy/utils.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+template <typename TReq, typename TResp>
+class TGrpcFqRequestOperationCall : public TGrpcRequestOperationCall<TReq, TResp> {
+public:
+ using TBase = TGrpcRequestOperationCall<TReq, TResp>;
+ using TBase::GetProtoRequest;
+ using TBase::GetPeerMetaValues;
+ using NPerms = NKikimr::TEvTicketParser::TEvAuthorizeTicket;
+
+ const std::function<TVector<NPerms::TPermission>(const TReq&)>& Permissions;
+ TVector<TString> Sids;
+
+ TGrpcFqRequestOperationCall(NGrpc::IRequestContextBase* ctx,
+ void (*cb)(std::unique_ptr<IRequestOpCtx>, const IFacilityProvider&),
+ const std::function<TVector<NPerms::TPermission>(const TReq&)>& permissions)
+ : TGrpcRequestOperationCall<TReq, TResp>(ctx, cb, {}), Permissions(permissions) {
+ }
+
+ bool TryCustomAttributeProcess(const TSchemeBoardEvents::TDescribeSchemeResult& , ICheckerIface* iface) override {
+ TString scope = GetPeerMetaValues("x-ydb-fq-project").GetOrElse("");
+ if (scope.StartsWith("yandexcloud://")) {
+ const TVector<TString> path = StringSplitter(scope).Split('/').SkipEmpty();
+ if (path.size() == 2 || path.size() == 3) {
+ const TString& folderId = path.back();
+ const auto& permissions = Permissions(*GetProtoRequest());
+ TVector<TEvTicketParser::TEvAuthorizeTicket::TEntry> entries {{
+ permissions,
+ {
+ {"folder_id", folderId}
+ }
+ }};
+ std::transform(permissions.begin(), permissions.end(), std::back_inserter(Sids),
+ [](const auto& s) -> TString { return s.Permission + "@as"; });
+
+ auto serviceAccountId = NYq::ExtractServiceAccountId(*GetProtoRequest());
+ if (serviceAccountId) {
+ entries.push_back({
+ {{NPerms::Required("iam.serviceAccounts.use")}},
+ {
+ {"service_account_id", serviceAccountId}
+ }});
+ Sids.push_back("iam.serviceAccounts.use@as");
+ }
+
+ iface->SetEntries(entries);
+ return true;
+ }
+ }
+
+ return false;
+ }
+};
+
+void DoFederatedQueryCreateQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryListQueriesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryDescribeQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFederatedQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/core/grpc_services/service_fq_internal.h b/ydb/core/grpc_services/service_fq_internal.h
new file mode 100644
index 00000000000..2b090de59ba
--- /dev/null
+++ b/ydb/core/grpc_services/service_fq_internal.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#include <memory>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class IRequestOpCtx;
+class IFacilityProvider;
+
+void DoFqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoFqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/core/grpc_services/service_yq.h b/ydb/core/grpc_services/service_yq.h
index 612aea4ec7d..3afc2180e27 100644
--- a/ydb/core/grpc_services/service_yq.h
+++ b/ydb/core/grpc_services/service_yq.h
@@ -74,20 +74,20 @@ void DoYandexQueryGetQueryStatusRequest(std::unique_ptr<IRequestOpCtx> p, const
void DoYandexQueryModifyQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
void DoYandexQueryDeleteQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
void DoYandexQueryControlQueryRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
-void DoDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryGetResultDataRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryListJobsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDescribeJobRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryCreateConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryListConnectionsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDescribeConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryModifyConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDeleteConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryTestConnectionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryCreateBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryListBindingsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDescribeBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryModifyBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
+void DoYandexQueryDeleteBindingRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility);
} // namespace NGRpcService
} // namespace NKikimr
diff --git a/ydb/core/testlib/CMakeLists.txt b/ydb/core/testlib/CMakeLists.txt
index 8d386751ae2..f0411e6c092 100644
--- a/ydb/core/testlib/CMakeLists.txt
+++ b/ydb/core/testlib/CMakeLists.txt
@@ -45,8 +45,8 @@ target_link_libraries(ydb-core-testlib PUBLIC
core-mind-bscontroller
core-mind-hive
ydb-core-node_whiteboard
- ydb-core-protos
ydb-core-persqueue
+ ydb-core-protos
ydb-core-security
core-sys_view-processor
core-sys_view-service
@@ -80,6 +80,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-services-cms
ydb-services-datastreams
ydb-services-discovery
+ ydb-services-fq
ydb-services-kesus
ydb-services-persqueue_cluster_discovery
ydb-services-persqueue_v1
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 0ccf2f38270..d1319f60597 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -8,6 +8,8 @@
#include <ydb/services/auth/grpc_service.h>
#include <ydb/services/yq/grpc_service.h>
#include <ydb/services/yq/private_grpc.h>
+#include <ydb/services/fq/grpc_service.h>
+#include <ydb/services/fq/private_grpc.h>
#include <ydb/services/cms/grpc_service.h>
#include <ydb/services/datastreams/grpc_service.h>
#include <ydb/services/kesus/grpc_service.h>
@@ -328,6 +330,8 @@ namespace Tests {
if (Settings->EnableYq) {
GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId));
GRpcServer->AddService(new NGRpcService::TGRpcYqPrivateTaskService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId));
+ GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxyId));
}
if (const auto& factory = Settings->GrpcServiceFactory) {
// All services enabled by default for ut
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h
index c0f82ed566b..09cfddad7f3 100644
--- a/ydb/core/yq/libs/control_plane_storage/message_builders.h
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h
@@ -1,39 +1,41 @@
#pragma once
+#include "message_builders_yq.h"
+
#include <util/datetime/base.h>
#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
-#include <ydb/public/api/protos/yq.pb.h>
+#include <ydb/public/api/protos/fq.pb.h>
#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
#include <library/cpp/protobuf/interop/cast.h>
-namespace NYq {
+namespace NFq {
// Queries
class TCreateQueryBuilder {
- YandexQuery::CreateQueryRequest Request;
+ FederatedQuery::CreateQueryRequest Request;
public:
TCreateQueryBuilder()
{
- SetMode(YandexQuery::RUN);
- SetType(YandexQuery::QueryContent::ANALYTICS);
+ SetMode(FederatedQuery::RUN);
+ SetType(FederatedQuery::QueryContent::ANALYTICS);
SetName("test_query_name_1");
- SetVisibility(YandexQuery::Acl::SCOPE);
+ SetVisibility(FederatedQuery::Acl::SCOPE);
SetText("SELECT 1;");
}
- TCreateQueryBuilder& SetMode(YandexQuery::ExecuteMode mode)
+ TCreateQueryBuilder& SetMode(FederatedQuery::ExecuteMode mode)
{
Request.set_execute_mode(mode);
return *this;
}
- TCreateQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type)
+ TCreateQueryBuilder& SetType(FederatedQuery::QueryContent::QueryType type)
{
Request.mutable_content()->set_type(type);
return *this;
@@ -45,7 +47,7 @@ public:
return *this;
}
- TCreateQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TCreateQueryBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
@@ -69,7 +71,7 @@ public:
return *this;
}
- TCreateQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition)
+ TCreateQueryBuilder& SetDisposition(const FederatedQuery::StreamingDisposition& disposition)
{
*Request.mutable_disposition() = disposition;
return *this;
@@ -81,14 +83,14 @@ public:
return *this;
}
- const YandexQuery::CreateQueryRequest& Build()
+ const FederatedQuery::CreateQueryRequest& Build()
{
return Request;
}
};
class TListQueriesBuilder {
- YandexQuery::ListQueriesRequest Request;
+ FederatedQuery::ListQueriesRequest Request;
public:
TListQueriesBuilder()
@@ -108,14 +110,14 @@ public:
return *this;
}
- const YandexQuery::ListQueriesRequest& Build()
+ const FederatedQuery::ListQueriesRequest& Build()
{
return Request;
}
};
class TDescribeQueryBuilder {
- YandexQuery::DescribeQueryRequest Request;
+ FederatedQuery::DescribeQueryRequest Request;
public:
TDescribeQueryBuilder& SetQueryId(const TString& queryId)
@@ -124,14 +126,14 @@ public:
return *this;
}
- const YandexQuery::DescribeQueryRequest& Build()
+ const FederatedQuery::DescribeQueryRequest& Build()
{
return Request;
}
};
class TGetQueryStatusBuilder {
- YandexQuery::GetQueryStatusRequest Request;
+ FederatedQuery::GetQueryStatusRequest Request;
public:
TGetQueryStatusBuilder& SetQueryId(const TString& queryId)
@@ -140,14 +142,14 @@ public:
return *this;
}
- const YandexQuery::GetQueryStatusRequest& Build()
+ const FederatedQuery::GetQueryStatusRequest& Build()
{
return Request;
}
};
class TDeleteQueryBuilder {
- YandexQuery::DeleteQueryRequest Request;
+ FederatedQuery::DeleteQueryRequest Request;
public:
TDeleteQueryBuilder& SetQueryId(const TString& queryId)
@@ -168,22 +170,22 @@ public:
return *this;
}
- const YandexQuery::DeleteQueryRequest& Build()
+ const FederatedQuery::DeleteQueryRequest& Build()
{
return Request;
}
};
class TModifyQueryBuilder {
- YandexQuery::ModifyQueryRequest Request;
+ FederatedQuery::ModifyQueryRequest Request;
public:
TModifyQueryBuilder()
{
SetName("test_query_name_2");
- SetMode(YandexQuery::RUN);
- SetType(YandexQuery::QueryContent::ANALYTICS);
- SetVisibility(YandexQuery::Acl::SCOPE);
+ SetMode(FederatedQuery::RUN);
+ SetType(FederatedQuery::QueryContent::ANALYTICS);
+ SetVisibility(FederatedQuery::Acl::SCOPE);
SetText("SELECT 1;");
}
@@ -193,19 +195,19 @@ public:
return *this;
}
- TModifyQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type)
+ TModifyQueryBuilder& SetType(FederatedQuery::QueryContent::QueryType type)
{
Request.mutable_content()->set_type(type);
return *this;
}
- TModifyQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TModifyQueryBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
}
- TModifyQueryBuilder& SetMode(YandexQuery::ExecuteMode mode)
+ TModifyQueryBuilder& SetMode(FederatedQuery::ExecuteMode mode)
{
Request.set_execute_mode(mode);
return *this;
@@ -223,13 +225,13 @@ public:
return *this;
}
- TModifyQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition)
+ TModifyQueryBuilder& SetDisposition(const FederatedQuery::StreamingDisposition& disposition)
{
*Request.mutable_disposition() = disposition;
return *this;
}
- TModifyQueryBuilder& SetState(const YandexQuery::StateLoadMode& state)
+ TModifyQueryBuilder& SetState(const FederatedQuery::StateLoadMode& state)
{
Request.set_state_load_mode(state);
return *this;
@@ -259,22 +261,22 @@ public:
return *this;
}
- const YandexQuery::ModifyQueryRequest& Build()
+ const FederatedQuery::ModifyQueryRequest& Build()
{
return Request;
}
};
class TControlQueryBuilder {
- YandexQuery::ControlQueryRequest Request;
+ FederatedQuery::ControlQueryRequest Request;
public:
TControlQueryBuilder()
{
- SetAction(YandexQuery::ABORT);
+ SetAction(FederatedQuery::ABORT);
}
- TControlQueryBuilder& SetAction(const YandexQuery::QueryAction& action)
+ TControlQueryBuilder& SetAction(const FederatedQuery::QueryAction& action)
{
Request.set_action(action);
return *this;
@@ -298,7 +300,7 @@ public:
return *this;
}
- const YandexQuery::ControlQueryRequest& Build()
+ const FederatedQuery::ControlQueryRequest& Build()
{
return Request;
}
@@ -307,7 +309,7 @@ public:
// Results
class TGetResultDataBuilder {
- YandexQuery::GetResultDataRequest Request;
+ FederatedQuery::GetResultDataRequest Request;
public:
TGetResultDataBuilder()
@@ -339,7 +341,7 @@ public:
return *this;
}
- const YandexQuery::GetResultDataRequest& Build()
+ const FederatedQuery::GetResultDataRequest& Build()
{
return Request;
}
@@ -348,7 +350,7 @@ public:
// Jobs
class TListJobsBuilder {
- YandexQuery::ListJobsRequest Request;
+ FederatedQuery::ListJobsRequest Request;
public:
TListJobsBuilder()
@@ -374,14 +376,14 @@ public:
return *this;
}
- const YandexQuery::ListJobsRequest& Build()
+ const FederatedQuery::ListJobsRequest& Build()
{
return Request;
}
};
class TDescribeJobBuilder {
- YandexQuery::DescribeJobRequest Request;
+ FederatedQuery::DescribeJobRequest Request;
public:
TDescribeJobBuilder& SetJobId(const TString& jobId)
@@ -390,7 +392,7 @@ public:
return *this;
}
- const YandexQuery::DescribeJobRequest& Build()
+ const FederatedQuery::DescribeJobRequest& Build()
{
return Request;
}
@@ -399,13 +401,13 @@ public:
// Connections
class TCreateConnectionBuilder {
- YandexQuery::CreateConnectionRequest Request;
+ FederatedQuery::CreateConnectionRequest Request;
public:
TCreateConnectionBuilder()
{
SetName("test_connection_name_1");
- SetVisibility(YandexQuery::Acl::SCOPE);
+ SetVisibility(FederatedQuery::Acl::SCOPE);
CreateDataStreams("my_database_id", "");
}
@@ -477,7 +479,7 @@ public:
return *this;
}
- TCreateConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TCreateConnectionBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
@@ -501,14 +503,14 @@ public:
return *this;
}
- const YandexQuery::CreateConnectionRequest& Build()
+ const FederatedQuery::CreateConnectionRequest& Build()
{
return Request;
}
};
class TListConnectionsBuilder {
- YandexQuery::ListConnectionsRequest Request;
+ FederatedQuery::ListConnectionsRequest Request;
public:
TListConnectionsBuilder()
@@ -528,14 +530,14 @@ public:
return *this;
}
- const YandexQuery::ListConnectionsRequest& Build()
+ const FederatedQuery::ListConnectionsRequest& Build()
{
return Request;
}
};
class TDescribeConnectionBuilder {
- YandexQuery::DescribeConnectionRequest Request;
+ FederatedQuery::DescribeConnectionRequest Request;
public:
TDescribeConnectionBuilder& SetConnectionId(const TString& connectionId)
@@ -544,20 +546,20 @@ public:
return *this;
}
- const YandexQuery::DescribeConnectionRequest& Build()
+ const FederatedQuery::DescribeConnectionRequest& Build()
{
return Request;
}
};
class TModifyConnectionBuilder {
- YandexQuery::ModifyConnectionRequest Request;
+ FederatedQuery::ModifyConnectionRequest Request;
public:
TModifyConnectionBuilder()
{
SetName("test_connection_name_2");
- SetVisibility(YandexQuery::Acl::SCOPE);
+ SetVisibility(FederatedQuery::Acl::SCOPE);
CreateDataStreams("my_database_id", "");
}
@@ -615,7 +617,7 @@ public:
return *this;
}
- TModifyConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TModifyConnectionBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
@@ -651,14 +653,14 @@ public:
return *this;
}
- const YandexQuery::ModifyConnectionRequest& Build()
+ const FederatedQuery::ModifyConnectionRequest& Build()
{
return Request;
}
};
class TDeleteConnectionBuilder {
- YandexQuery::DeleteConnectionRequest Request;
+ FederatedQuery::DeleteConnectionRequest Request;
public:
TDeleteConnectionBuilder& SetConnectionId(const TString& connectionId)
@@ -679,7 +681,7 @@ public:
return *this;
}
- const YandexQuery::DeleteConnectionRequest& Build()
+ const FederatedQuery::DeleteConnectionRequest& Build()
{
return Request;
}
@@ -688,14 +690,14 @@ public:
// Bindings
class TCreateBindingBuilder {
- YandexQuery::CreateBindingRequest Request;
+ FederatedQuery::CreateBindingRequest Request;
public:
TCreateBindingBuilder()
{
SetName("test_binding_name_1");
- SetVisibility(YandexQuery::Acl::SCOPE);
- YandexQuery::DataStreamsBinding binding;
+ SetVisibility(FederatedQuery::Acl::SCOPE);
+ FederatedQuery::DataStreamsBinding binding;
binding.set_stream_name("my_stream");
binding.set_format("json");
binding.set_compression("zip");
@@ -711,19 +713,19 @@ public:
return *this;
}
- TCreateBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding)
+ TCreateBindingBuilder& CreateDataStreams(const FederatedQuery::DataStreamsBinding& binding)
{
*Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding;
return *this;
}
- TCreateBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding)
+ TCreateBindingBuilder& CreateObjectStorage(const FederatedQuery::ObjectStorageBinding& binding)
{
*Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding;
return *this;
}
- TCreateBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TCreateBindingBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
@@ -747,14 +749,14 @@ public:
return *this;
}
- const YandexQuery::CreateBindingRequest& Build()
+ const FederatedQuery::CreateBindingRequest& Build()
{
return Request;
}
};
class TListBindingsBuilder {
- YandexQuery::ListBindingsRequest Request;
+ FederatedQuery::ListBindingsRequest Request;
public:
TListBindingsBuilder()
@@ -780,14 +782,14 @@ public:
return *this;
}
- const YandexQuery::ListBindingsRequest& Build()
+ const FederatedQuery::ListBindingsRequest& Build()
{
return Request;
}
};
class TDescribeBindingBuilder {
- YandexQuery::DescribeBindingRequest Request;
+ FederatedQuery::DescribeBindingRequest Request;
public:
TDescribeBindingBuilder& SetBindingId(const TString& bindingId)
@@ -796,21 +798,21 @@ public:
return *this;
}
- const YandexQuery::DescribeBindingRequest& Build()
+ const FederatedQuery::DescribeBindingRequest& Build()
{
return Request;
}
};
class TModifyBindingBuilder {
- YandexQuery::ModifyBindingRequest Request;
+ FederatedQuery::ModifyBindingRequest Request;
public:
TModifyBindingBuilder()
{
SetName("test_binding_name_2");
- SetVisibility(YandexQuery::Acl::SCOPE);
- YandexQuery::DataStreamsBinding binding;
+ SetVisibility(FederatedQuery::Acl::SCOPE);
+ FederatedQuery::DataStreamsBinding binding;
binding.set_stream_name("my_stream");
binding.set_format("json");
binding.set_compression("zip");
@@ -826,19 +828,19 @@ public:
return *this;
}
- TModifyBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding)
+ TModifyBindingBuilder& CreateDataStreams(const FederatedQuery::DataStreamsBinding& binding)
{
*Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding;
return *this;
}
- TModifyBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding)
+ TModifyBindingBuilder& CreateObjectStorage(const FederatedQuery::ObjectStorageBinding& binding)
{
*Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding;
return *this;
}
- TModifyBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ TModifyBindingBuilder& SetVisibility(FederatedQuery::Acl::Visibility visibility)
{
Request.mutable_content()->mutable_acl()->set_visibility(visibility);
return *this;
@@ -874,14 +876,14 @@ public:
return *this;
}
- const YandexQuery::ModifyBindingRequest& Build()
+ const FederatedQuery::ModifyBindingRequest& Build()
{
return Request;
}
};
class TDeleteBindingBuilder {
- YandexQuery::DeleteBindingRequest Request;
+ FederatedQuery::DeleteBindingRequest Request;
public:
TDeleteBindingBuilder& SetBindingId(const TString& bindingId)
@@ -902,7 +904,7 @@ public:
return *this;
}
- const YandexQuery::DeleteBindingRequest& Build()
+ const FederatedQuery::DeleteBindingRequest& Build()
{
return Request;
}
@@ -957,9 +959,9 @@ public:
return *this;
}
- std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build()
+ std::unique_ptr<NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest> Build()
{
- auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>();
+ auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvWriteResultDataRequest>();
request->Request.mutable_result_id()->set_value(ResultId);
*request->Request.mutable_result_set() = ResultSet;
request->Request.set_result_set_id(ResultSetId);
@@ -1004,9 +1006,9 @@ public:
return *this;
}
- std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build()
+ std::unique_ptr<NYq::TEvControlPlaneStorage::TEvGetTaskRequest> Build()
{
- auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>();
+ auto request = std::make_unique<NYq::TEvControlPlaneStorage::TEvGetTaskRequest>();
request->Request.set_tenant(TenantName);
request->Request.set_owner_id(Owner);
request->Request.set_host(HostName);
@@ -1022,11 +1024,11 @@ class TPingTaskBuilder {
TString ResultId;
TString Owner;
TInstant Deadline;
- TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status;
+ TMaybe<FederatedQuery::QueryMeta::ComputeStatus> Status;
TMaybe<NYql::TIssues> Issues;
TMaybe<NYql::TIssues> TransientIssues;
TMaybe<TString> Statistics;
- TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas;
+ TMaybe<TVector<FederatedQuery::ResultSetMeta>> ResultSetMetas;
TMaybe<TString> Ast;
TMaybe<TString> Plan;
TMaybe<TInstant> StartedAt;
@@ -1086,7 +1088,7 @@ public:
return *this;
}
- TPingTaskBuilder& SetStatus(const YandexQuery::QueryMeta::ComputeStatus& status)
+ TPingTaskBuilder& SetStatus(const FederatedQuery::QueryMeta::ComputeStatus& status)
{
Status = status;
return *this;
@@ -1110,7 +1112,7 @@ public:
return *this;
}
- TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas)
+ TPingTaskBuilder& SetResultSetMetas(const TVector<FederatedQuery::ResultSetMeta>& resultSetMetas)
{
ResultSetMetas = resultSetMetas;
return *this;
@@ -1140,9 +1142,9 @@ public:
return *this;
}
- TPingTaskBuilder& SetResignQuery(bool resignQuery = true)
- {
- ResignQuery = resignQuery;
+ TPingTaskBuilder& SetResignQuery(bool resignQuery = true)
+ {
+ ResignQuery = resignQuery;
return *this;
}
@@ -1170,14 +1172,14 @@ public:
return *this;
}
- std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build()
+ std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build()
{
Yq::Private::PingTaskRequest request;
request.set_owner_id(Owner);
request.mutable_query_id()->set_value(QueryId);
request.mutable_result_id()->set_value(ResultId);
if (Status) {
- request.set_status(*Status);
+ request.set_status((YandexQuery::QueryMeta::ComputeStatus)*Status);
}
request.set_status_code(StatusCode);
if (Issues) {
@@ -1191,7 +1193,9 @@ public:
}
if (ResultSetMetas) {
for (const auto& meta : *ResultSetMetas) {
- *request.add_result_set_meta() = meta;
+ YandexQuery::ResultSetMeta casted;
+ casted.CopyFrom(meta);
+ *request.add_result_set_meta() = casted;
}
}
for (const auto& dqGraph : DqGraphs) {
@@ -1226,7 +1230,7 @@ public:
*request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt);
}
- return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
+ return std::make_unique<NYq::TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
}
};
@@ -1285,7 +1289,7 @@ public:
return *this;
}
- std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build()
+ std::unique_ptr<NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build()
{
Yq::Private::NodesHealthCheckRequest request;
request.set_tenant(TenantName);
@@ -1296,7 +1300,7 @@ public:
node.set_active_workers(ActiveWorkers);
node.set_memory_limit(MemoryLimit);
node.set_memory_allocated(MemoryAllocated);
- return std::make_unique<TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request));
+ return std::make_unique<NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request));
}
};
diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h
new file mode 100644
index 00000000000..e0fc67201e1
--- /dev/null
+++ b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h
@@ -0,0 +1,1305 @@
+// TODO: remove YQ-1055
+
+#pragma once
+
+#include <util/datetime/base.h>
+
+#include <ydb/library/yql/dq/actors/protos/dq_status_codes.pb.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/public/api/protos/yq.pb.h>
+
+#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+
+#include <library/cpp/protobuf/interop/cast.h>
+
+namespace NYq {
+
+// Queries
+
+class TCreateQueryBuilder {
+ YandexQuery::CreateQueryRequest Request;
+
+public:
+ TCreateQueryBuilder()
+ {
+ SetMode(YandexQuery::RUN);
+ SetType(YandexQuery::QueryContent::ANALYTICS);
+ SetName("test_query_name_1");
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ SetText("SELECT 1;");
+ }
+
+ TCreateQueryBuilder& SetMode(YandexQuery::ExecuteMode mode)
+ {
+ Request.set_execute_mode(mode);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type)
+ {
+ Request.mutable_content()->set_type(type);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetAutomatic(bool automatic)
+ {
+ Request.mutable_content()->set_automatic(automatic);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetText(const TString& content)
+ {
+ Request.mutable_content()->set_text(content);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TCreateQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition)
+ {
+ *Request.mutable_disposition() = disposition;
+ return *this;
+ }
+
+ TCreateQueryBuilder& ClearAcl()
+ {
+ Request.mutable_content()->clear_acl();
+ return *this;
+ }
+
+ const YandexQuery::CreateQueryRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TListQueriesBuilder {
+ YandexQuery::ListQueriesRequest Request;
+
+public:
+ TListQueriesBuilder()
+ {
+ SetLimit(10);
+ }
+
+ TListQueriesBuilder& SetPageToken(const TString& pageToken)
+ {
+ Request.set_page_token(pageToken);
+ return *this;
+ }
+
+ TListQueriesBuilder& SetLimit(int64_t limit)
+ {
+ Request.set_limit(limit);
+ return *this;
+ }
+
+ const YandexQuery::ListQueriesRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDescribeQueryBuilder {
+ YandexQuery::DescribeQueryRequest Request;
+
+public:
+ TDescribeQueryBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ const YandexQuery::DescribeQueryRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TGetQueryStatusBuilder {
+ YandexQuery::GetQueryStatusRequest Request;
+
+public:
+ TGetQueryStatusBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ const YandexQuery::GetQueryStatusRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDeleteQueryBuilder {
+ YandexQuery::DeleteQueryRequest Request;
+
+public:
+ TDeleteQueryBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ TDeleteQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TDeleteQueryBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::DeleteQueryRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TModifyQueryBuilder {
+ YandexQuery::ModifyQueryRequest Request;
+
+public:
+ TModifyQueryBuilder()
+ {
+ SetName("test_query_name_2");
+ SetMode(YandexQuery::RUN);
+ SetType(YandexQuery::QueryContent::ANALYTICS);
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ SetText("SELECT 1;");
+ }
+
+ TModifyQueryBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetType(YandexQuery::QueryContent::QueryType type)
+ {
+ Request.mutable_content()->set_type(type);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetMode(YandexQuery::ExecuteMode mode)
+ {
+ Request.set_execute_mode(mode);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetAutomatic(bool automatic)
+ {
+ Request.mutable_content()->set_automatic(automatic);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetText(const TString& content)
+ {
+ Request.mutable_content()->set_text(content);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetDisposition(const YandexQuery::StreamingDisposition& disposition)
+ {
+ *Request.mutable_disposition() = disposition;
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetState(const YandexQuery::StateLoadMode& state)
+ {
+ Request.set_state_load_mode(state);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ TModifyQueryBuilder& SetDescription(const TString& description)
+ {
+ Request.mutable_content()->set_description(description);
+ return *this;
+ }
+
+ const YandexQuery::ModifyQueryRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TControlQueryBuilder {
+ YandexQuery::ControlQueryRequest Request;
+
+public:
+ TControlQueryBuilder()
+ {
+ SetAction(YandexQuery::ABORT);
+ }
+
+ TControlQueryBuilder& SetAction(const YandexQuery::QueryAction& action)
+ {
+ Request.set_action(action);
+ return *this;
+ }
+
+ TControlQueryBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ TControlQueryBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TControlQueryBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::ControlQueryRequest& Build()
+ {
+ return Request;
+ }
+};
+
+// Results
+
+class TGetResultDataBuilder {
+ YandexQuery::GetResultDataRequest Request;
+
+public:
+ TGetResultDataBuilder()
+ {
+ SetLimit(10);
+ }
+
+ TGetResultDataBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.set_query_id(queryId);
+ return *this;
+ }
+
+ TGetResultDataBuilder& SetResultSetIndex(int64_t resultSetIndex)
+ {
+ Request.set_result_set_index(resultSetIndex);
+ return *this;
+ }
+
+ TGetResultDataBuilder& SetOffset(int64_t offset)
+ {
+ Request.set_offset(offset);
+ return *this;
+ }
+
+ TGetResultDataBuilder& SetLimit(int64_t limit)
+ {
+ Request.set_limit(limit);
+ return *this;
+ }
+
+ const YandexQuery::GetResultDataRequest& Build()
+ {
+ return Request;
+ }
+};
+
+// Jobs
+
+class TListJobsBuilder {
+ YandexQuery::ListJobsRequest Request;
+
+public:
+ TListJobsBuilder()
+ {
+ SetLimit(10);
+ }
+
+ TListJobsBuilder& SetQueryId(const TString& queryId)
+ {
+ Request.mutable_filter()->set_query_id(queryId);
+ return *this;
+ }
+
+ TListJobsBuilder& SetPageToken(const TString& pageToken)
+ {
+ Request.set_page_token(pageToken);
+ return *this;
+ }
+
+ TListJobsBuilder& SetLimit(int64_t limit)
+ {
+ Request.set_limit(limit);
+ return *this;
+ }
+
+ const YandexQuery::ListJobsRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDescribeJobBuilder {
+ YandexQuery::DescribeJobRequest Request;
+
+public:
+ TDescribeJobBuilder& SetJobId(const TString& jobId)
+ {
+ Request.set_job_id(jobId);
+ return *this;
+ }
+
+ const YandexQuery::DescribeJobRequest& Build()
+ {
+ return Request;
+ }
+};
+
+// Connections
+
+class TCreateConnectionBuilder {
+ YandexQuery::CreateConnectionRequest Request;
+
+public:
+ TCreateConnectionBuilder()
+ {
+ SetName("test_connection_name_1");
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ CreateDataStreams("my_database_id", "");
+ }
+
+ TCreateConnectionBuilder& CreateYdb(const TString& database, const TString& endpoint, const TString& serviceAccount)
+ {
+ auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database();
+ if (serviceAccount) {
+ ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ ydb.mutable_auth()->mutable_current_iam();
+ }
+
+ ydb.set_database(database);
+ ydb.set_endpoint(endpoint);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& CreateYdb(const TString& databaseId, const TString& serviceAccount)
+ {
+ auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database();
+ if (serviceAccount) {
+ ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ ydb.mutable_auth()->mutable_current_iam();
+ }
+
+ ydb.set_database_id(databaseId);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& CreateDataStreams(const TString& databaseId, const TString& serviceAccount)
+ {
+ auto& yds = *Request.mutable_content()->mutable_setting()->mutable_data_streams();
+ if (serviceAccount) {
+ yds.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ yds.mutable_auth()->mutable_current_iam();
+ }
+
+ yds.set_database_id(databaseId);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& CreateClickHouse(const TString& databaseId, const TString& login, const TString& password, const TString& serviceAccount)
+ {
+ auto& ch = *Request.mutable_content()->mutable_setting()->mutable_clickhouse_cluster();
+ if (serviceAccount) {
+ ch.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ ch.mutable_auth()->mutable_current_iam();
+ }
+
+ ch.set_database_id(databaseId);
+ ch.set_login(login);
+ ch.set_password(password);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& CreateObjectStorage(const TString& bucket, const TString& serviceAccount)
+ {
+ auto& os = *Request.mutable_content()->mutable_setting()->mutable_object_storage();
+ if (serviceAccount) {
+ os.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ os.mutable_auth()->mutable_current_iam();
+ }
+
+ os.set_bucket(bucket);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& SetDescription(const TString& description)
+ {
+ Request.mutable_content()->set_name(description);
+ return *this;
+ }
+
+ TCreateConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ const YandexQuery::CreateConnectionRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TListConnectionsBuilder {
+ YandexQuery::ListConnectionsRequest Request;
+
+public:
+ TListConnectionsBuilder()
+ {
+ SetLimit(10);
+ }
+
+ TListConnectionsBuilder& SetPageToken(const TString& pageToken)
+ {
+ Request.set_page_token(pageToken);
+ return *this;
+ }
+
+ TListConnectionsBuilder& SetLimit(int64_t limit)
+ {
+ Request.set_limit(limit);
+ return *this;
+ }
+
+ const YandexQuery::ListConnectionsRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDescribeConnectionBuilder {
+ YandexQuery::DescribeConnectionRequest Request;
+
+public:
+ TDescribeConnectionBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.set_connection_id(connectionId);
+ return *this;
+ }
+
+ const YandexQuery::DescribeConnectionRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TModifyConnectionBuilder {
+ YandexQuery::ModifyConnectionRequest Request;
+
+public:
+ TModifyConnectionBuilder()
+ {
+ SetName("test_connection_name_2");
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ CreateDataStreams("my_database_id", "");
+ }
+
+ TModifyConnectionBuilder& CreateYdb(const TString& databaseId, const TString& serviceAccount)
+ {
+ auto& ydb = *Request.mutable_content()->mutable_setting()->mutable_ydb_database();
+ if (serviceAccount) {
+ ydb.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ ydb.mutable_auth()->mutable_current_iam();
+ }
+
+ ydb.set_database_id(databaseId);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& CreateDataStreams(const TString& databaseId, const TString& serviceAccount)
+ {
+ auto& yds = *Request.mutable_content()->mutable_setting()->mutable_data_streams();
+ if (serviceAccount) {
+ yds.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ yds.mutable_auth()->mutable_current_iam();
+ }
+
+ yds.set_database_id(databaseId);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& CreateClickHouse(const TString& databaseId, const TString& login, const TString& password, const TString& serviceAccount)
+ {
+ auto& ch = *Request.mutable_content()->mutable_setting()->mutable_clickhouse_cluster();
+ if (serviceAccount) {
+ ch.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ ch.mutable_auth()->mutable_current_iam();
+ }
+
+ ch.set_database_id(databaseId);
+ ch.set_login(login);
+ ch.set_password(password);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& CreateObjectStorage(const TString& bucket, const TString& serviceAccount)
+ {
+ auto& os = *Request.mutable_content()->mutable_setting()->mutable_object_storage();
+ if (serviceAccount) {
+ os.mutable_auth()->mutable_service_account()->set_id(serviceAccount);
+ } else {
+ os.mutable_auth()->mutable_current_iam();
+ }
+
+ os.set_bucket(bucket);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetDescription(const TString& description)
+ {
+ Request.mutable_content()->set_description(description);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.set_connection_id(connectionId);
+ return *this;
+ }
+
+ TModifyConnectionBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::ModifyConnectionRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDeleteConnectionBuilder {
+ YandexQuery::DeleteConnectionRequest Request;
+
+public:
+ TDeleteConnectionBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.set_connection_id(connectionId);
+ return *this;
+ }
+
+ TDeleteConnectionBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TDeleteConnectionBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::DeleteConnectionRequest& Build()
+ {
+ return Request;
+ }
+};
+
+// Bindings
+
+class TCreateBindingBuilder {
+ YandexQuery::CreateBindingRequest Request;
+
+public:
+ TCreateBindingBuilder()
+ {
+ SetName("test_binding_name_1");
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ YandexQuery::DataStreamsBinding binding;
+ binding.set_stream_name("my_stream");
+ binding.set_format("json");
+ binding.set_compression("zip");
+ auto* column = binding.mutable_schema()->add_column();
+ column->set_name("sample_column_name");
+ column->mutable_type()->set_type_id(Ydb::Type::UINT64);
+ CreateDataStreams(binding);
+ }
+
+ TCreateBindingBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.mutable_content()->set_connection_id(connectionId);
+ return *this;
+ }
+
+ TCreateBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding)
+ {
+ *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding;
+ return *this;
+ }
+
+ TCreateBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding)
+ {
+ *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding;
+ return *this;
+ }
+
+ TCreateBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TCreateBindingBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TCreateBindingBuilder& SetDescription(const TString& description)
+ {
+ Request.mutable_content()->set_name(description);
+ return *this;
+ }
+
+ TCreateBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ const YandexQuery::CreateBindingRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TListBindingsBuilder {
+ YandexQuery::ListBindingsRequest Request;
+
+public:
+ TListBindingsBuilder()
+ {
+ SetLimit(10);
+ }
+
+ TListBindingsBuilder& SetPageToken(const TString& pageToken)
+ {
+ Request.set_page_token(pageToken);
+ return *this;
+ }
+
+ TListBindingsBuilder& SetLimit(int64_t limit)
+ {
+ Request.set_limit(limit);
+ return *this;
+ }
+
+ TListBindingsBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.mutable_filter()->set_connection_id(connectionId);
+ return *this;
+ }
+
+ const YandexQuery::ListBindingsRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDescribeBindingBuilder {
+ YandexQuery::DescribeBindingRequest Request;
+
+public:
+ TDescribeBindingBuilder& SetBindingId(const TString& bindingId)
+ {
+ Request.set_binding_id(bindingId);
+ return *this;
+ }
+
+ const YandexQuery::DescribeBindingRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TModifyBindingBuilder {
+ YandexQuery::ModifyBindingRequest Request;
+
+public:
+ TModifyBindingBuilder()
+ {
+ SetName("test_binding_name_2");
+ SetVisibility(YandexQuery::Acl::SCOPE);
+ YandexQuery::DataStreamsBinding binding;
+ binding.set_stream_name("my_stream");
+ binding.set_format("json");
+ binding.set_compression("zip");
+ auto* column = binding.mutable_schema()->add_column();
+ column->set_name("sample_column_name");
+ column->mutable_type()->set_type_id(Ydb::Type::UINT64);
+ CreateDataStreams(binding);
+ }
+
+ TModifyBindingBuilder& SetConnectionId(const TString& connectionId)
+ {
+ Request.mutable_content()->set_connection_id(connectionId);
+ return *this;
+ }
+
+ TModifyBindingBuilder& CreateDataStreams(const YandexQuery::DataStreamsBinding& binding)
+ {
+ *Request.mutable_content()->mutable_setting()->mutable_data_streams() = binding;
+ return *this;
+ }
+
+ TModifyBindingBuilder& CreateObjectStorage(const YandexQuery::ObjectStorageBinding& binding)
+ {
+ *Request.mutable_content()->mutable_setting()->mutable_object_storage() = binding;
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetVisibility(YandexQuery::Acl::Visibility visibility)
+ {
+ Request.mutable_content()->mutable_acl()->set_visibility(visibility);
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetName(const TString& name)
+ {
+ Request.mutable_content()->set_name(name);
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetDescription(const TString& description)
+ {
+ Request.mutable_content()->set_name(description);
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetBindingId(const TString& bindingId)
+ {
+ Request.set_binding_id(bindingId);
+ return *this;
+ }
+
+ TModifyBindingBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::ModifyBindingRequest& Build()
+ {
+ return Request;
+ }
+};
+
+class TDeleteBindingBuilder {
+ YandexQuery::DeleteBindingRequest Request;
+
+public:
+ TDeleteBindingBuilder& SetBindingId(const TString& bindingId)
+ {
+ Request.set_binding_id(bindingId);
+ return *this;
+ }
+
+ TDeleteBindingBuilder& SetIdempotencyKey(const TString& idempotencyKey)
+ {
+ Request.set_idempotency_key(idempotencyKey);
+ return *this;
+ }
+
+ TDeleteBindingBuilder& SetPreviousRevision(const int64_t periousRevision)
+ {
+ Request.set_previous_revision(periousRevision);
+ return *this;
+ }
+
+ const YandexQuery::DeleteBindingRequest& Build()
+ {
+ return Request;
+ }
+};
+
+// internal
+
+class TWriteResultDataBuilder {
+ TString ResultId;
+ int32_t ResultSetId = 0;
+ int64_t StartRowId = 0;
+ TInstant Deadline;
+ Ydb::ResultSet ResultSet;
+
+public:
+ TWriteResultDataBuilder()
+ {
+ SetDeadline(TInstant::Now() + TDuration::Minutes(5));
+ Ydb::ResultSet resultSet;
+ auto& value = *resultSet.add_rows();
+ value.set_int64_value(1);
+ SetResultSet(resultSet);
+ }
+
+ TWriteResultDataBuilder& SetResultId(const TString& resultId)
+ {
+ ResultId = resultId;
+ return *this;
+ }
+
+ TWriteResultDataBuilder& SetResultSetIndex(int32_t resultSetId)
+ {
+ ResultSetId = resultSetId;
+ return *this;
+ }
+
+ TWriteResultDataBuilder& SetStartRowId(int64_t startRowId)
+ {
+ StartRowId = startRowId;
+ return *this;
+ }
+
+ TWriteResultDataBuilder& SetDeadline(const TInstant& deadline)
+ {
+ Deadline = deadline;
+ return *this;
+ }
+
+ TWriteResultDataBuilder& SetResultSet(const Ydb::ResultSet& resultSet)
+ {
+ ResultSet = resultSet;
+ return *this;
+ }
+
+ std::unique_ptr<TEvControlPlaneStorage::TEvWriteResultDataRequest> Build()
+ {
+ auto request = std::make_unique<TEvControlPlaneStorage::TEvWriteResultDataRequest>();
+ request->Request.mutable_result_id()->set_value(ResultId);
+ *request->Request.mutable_result_set() = ResultSet;
+ request->Request.set_result_set_id(ResultSetId);
+ request->Request.set_offset(StartRowId);
+ *request->Request.mutable_deadline() = NProtoInterop::CastToProto(Deadline);
+ return request;
+ }
+};
+
+class TGetTaskBuilder {
+ TString Owner;
+ TString HostName;
+ TString TenantName;
+
+public:
+ TGetTaskBuilder()
+ {
+ SetOwner(DefaultOwner());
+ SetHostName("localhost");
+ SetTenantName("/root/tenant");
+ }
+
+ static TString DefaultOwner() {
+ return "owner";
+ }
+
+ TGetTaskBuilder& SetOwner(const TString& owner)
+ {
+ Owner = owner;
+ return *this;
+ }
+
+ TGetTaskBuilder& SetHostName(const TString& hostName)
+ {
+ HostName = hostName;
+ return *this;
+ }
+
+ TGetTaskBuilder& SetTenantName(const TString& tenantName)
+ {
+ TenantName = tenantName;
+ return *this;
+ }
+
+ std::unique_ptr<TEvControlPlaneStorage::TEvGetTaskRequest> Build()
+ {
+ auto request = std::make_unique<TEvControlPlaneStorage::TEvGetTaskRequest>();
+ request->Request.set_tenant(TenantName);
+ request->Request.set_owner_id(Owner);
+ request->Request.set_host(HostName);
+ return request;
+ }
+};
+
+class TPingTaskBuilder {
+ TString TenantName;
+ TString CloudId;
+ TString Scope;
+ TString QueryId;
+ TString ResultId;
+ TString Owner;
+ TInstant Deadline;
+ TMaybe<YandexQuery::QueryMeta::ComputeStatus> Status;
+ TMaybe<NYql::TIssues> Issues;
+ TMaybe<NYql::TIssues> TransientIssues;
+ TMaybe<TString> Statistics;
+ TMaybe<TVector<YandexQuery::ResultSetMeta>> ResultSetMetas;
+ TMaybe<TString> Ast;
+ TMaybe<TString> Plan;
+ TMaybe<TInstant> StartedAt;
+ TMaybe<TInstant> FinishedAt;
+ bool ResignQuery = false;
+ NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::UNSPECIFIED;
+ TVector<NYq::TEvControlPlaneStorage::TTopicConsumer> CreatedTopicConsumers;
+ TVector<TString> DqGraphs;
+ i32 DqGraphIndex = 0;
+
+public:
+ TPingTaskBuilder()
+ {
+ SetDeadline(TInstant::Now() + TDuration::Minutes(5));
+ SetTenantName("/root/tenant");
+ }
+
+ TPingTaskBuilder& SetTenantName(const TString& tenantName)
+ {
+ TenantName = tenantName;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetCloudId(const TString& cloudId)
+ {
+ CloudId = cloudId;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetScope(const TString& scope)
+ {
+ Scope = scope;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetQueryId(const TString& queryId)
+ {
+ QueryId = queryId;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetResultId(const TString& resultId)
+ {
+ ResultId = resultId;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetOwner(const TString& owner)
+ {
+ Owner = owner;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetDeadline(const TInstant& deadline)
+ {
+ Deadline = deadline;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetStatus(const YandexQuery::QueryMeta::ComputeStatus& status)
+ {
+ Status = status;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetIssues(const NYql::TIssues& issues)
+ {
+ Issues = issues;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetTransientIssues(const NYql::TIssues& issues)
+ {
+ TransientIssues = issues;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetStatistics(const TString& statistics)
+ {
+ Statistics = statistics;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetResultSetMetas(const TVector<YandexQuery::ResultSetMeta>& resultSetMetas)
+ {
+ ResultSetMetas = resultSetMetas;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetAst(const TString& ast)
+ {
+ Ast = ast;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetPlan(const TString& plan)
+ {
+ Plan = plan;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetStatedAt(const TInstant& started)
+ {
+ StartedAt = started;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetFinishedAt(const TInstant& finished)
+ {
+ FinishedAt = finished;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetResignQuery(bool resignQuery = true)
+ {
+ ResignQuery = resignQuery;
+ return *this;
+ }
+
+ TPingTaskBuilder& SetStatusCode(NYql::NDqProto::StatusIds::StatusCode statusCode = NYql::NDqProto::StatusIds::UNSPECIFIED)
+ {
+ StatusCode = statusCode;
+ return *this;
+ }
+
+ TPingTaskBuilder& AddCreatedConsumer(const TString& databaseId, const TString& database, const TString& topicPath, const TString& consumerName, const TString& clusterEndpoint, bool useSsl)
+ {
+ CreatedTopicConsumers.emplace_back(NYq::TEvControlPlaneStorage::TTopicConsumer{databaseId, database, topicPath, consumerName, clusterEndpoint, useSsl, "", false});
+ return *this;
+ }
+
+ TPingTaskBuilder& AddDqGraph(const TString& dqGraph)
+ {
+ DqGraphs.push_back(dqGraph);
+ return *this;
+ }
+
+ TPingTaskBuilder& SetDqGraphIndex(i32 dqGraphIndex)
+ {
+ DqGraphIndex = dqGraphIndex;
+ return *this;
+ }
+
+ std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build()
+ {
+ Yq::Private::PingTaskRequest request;
+ request.set_owner_id(Owner);
+ request.mutable_query_id()->set_value(QueryId);
+ request.mutable_result_id()->set_value(ResultId);
+ if (Status) {
+ request.set_status(*Status);
+ }
+ request.set_status_code(StatusCode);
+ if (Issues) {
+ NYql::IssuesToMessage(*Issues, request.mutable_issues());
+ }
+ if (TransientIssues) {
+ NYql::IssuesToMessage(*TransientIssues, request.mutable_transient_issues());
+ }
+ if (Statistics) {
+ request.set_statistics(*Statistics);
+ }
+ if (ResultSetMetas) {
+ for (const auto& meta : *ResultSetMetas) {
+ *request.add_result_set_meta() = meta;
+ }
+ }
+ for (const auto& dqGraph : DqGraphs) {
+ request.add_dq_graph(dqGraph);
+ }
+ request.set_dq_graph_index(DqGraphIndex);
+ if (Ast) {
+ request.set_ast(*Ast);
+ }
+ if (Plan) {
+ request.set_plan(*Plan);
+ }
+ request.set_resign_query(ResignQuery);
+ for (const auto& consumer : CreatedTopicConsumers) {
+ auto& cons = *request.add_created_topic_consumers();
+ cons.set_database_id(consumer.DatabaseId);
+ cons.set_database(consumer.Database);
+ cons.set_topic_path(consumer.TopicPath);
+ cons.set_consumer_name(consumer.ConsumerName);
+ cons.set_cluster_endpoint(consumer.ClusterEndpoint);
+ cons.set_use_ssl(consumer.UseSsl);
+ cons.set_token_name(consumer.TokenName);
+ cons.set_add_bearer_to_token(consumer.AddBearerToToken);
+ }
+ request.set_tenant(TenantName);
+ request.set_scope(Scope);
+ *request.mutable_deadline() = NProtoInterop::CastToProto(Deadline);
+ if (StartedAt) {
+ *request.mutable_started_at() = NProtoInterop::CastToProto(*StartedAt);
+ }
+ if (FinishedAt) {
+ *request.mutable_finished_at() = NProtoInterop::CastToProto(*FinishedAt);
+ }
+
+ return std::make_unique<TEvControlPlaneStorage::TEvPingTaskRequest>(std::move(request));
+ }
+};
+
+class TNodesHealthCheckBuilder {
+ TString TenantName;
+ ui32 NodeId = 0;
+ TString HostName;
+ TString InstanceId;
+ ui64 ActiveWorkers = 0;
+ ui64 MemoryLimit = 0;
+ ui64 MemoryAllocated = 0;
+
+public:
+ TNodesHealthCheckBuilder()
+ {}
+
+ TNodesHealthCheckBuilder& SetTenantName(const TString& tenantName)
+ {
+ TenantName = tenantName;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetNodeId(const ui32& nodeId)
+ {
+ NodeId = nodeId;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetHostName(const TString& hostName)
+ {
+ HostName = hostName;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetInstanceId(const TString& instanceId)
+ {
+ InstanceId = instanceId;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetActiveWorkers(const ui64& activeWorkers)
+ {
+ ActiveWorkers = activeWorkers;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetMemoryLimit(const ui64& memoryLimit)
+ {
+ MemoryLimit = memoryLimit;
+ return *this;
+ }
+
+ TNodesHealthCheckBuilder& SetMemoryAllocated(const ui64& memoryAllocated)
+ {
+ MemoryAllocated = memoryAllocated;
+ return *this;
+ }
+
+ std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build()
+ {
+ Yq::Private::NodesHealthCheckRequest request;
+ request.set_tenant(TenantName);
+ auto& node = *request.mutable_node();
+ node.set_node_id(NodeId);
+ node.set_instance_id(InstanceId);
+ node.set_hostname(HostName);
+ node.set_active_workers(ActiveWorkers);
+ node.set_memory_limit(MemoryLimit);
+ node.set_memory_allocated(MemoryAllocated);
+ return std::make_unique<TEvControlPlaneStorage::TEvNodesHealthCheckRequest>(std::move(request));
+ }
+};
+
+}
diff --git a/ydb/core/yq/libs/private_client/CMakeLists.txt b/ydb/core/yq/libs/private_client/CMakeLists.txt
index 0f5f23c5b4d..64e19f535e1 100644
--- a/ydb/core/yq/libs/private_client/CMakeLists.txt
+++ b/ydb/core/yq/libs/private_client/CMakeLists.txt
@@ -25,5 +25,6 @@ target_link_libraries(yq-libs-private_client PUBLIC
target_sources(yq-libs-private_client PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/internal_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/loopback_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client_fq.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client.cpp
)
diff --git a/ydb/core/yq/libs/private_client/private_client_fq.cpp b/ydb/core/yq/libs/private_client/private_client_fq.cpp
new file mode 100644
index 00000000000..f3d6992460e
--- /dev/null
+++ b/ydb/core/yq/libs/private_client/private_client_fq.cpp
@@ -0,0 +1,182 @@
+#include "private_client_fq.h"
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+
+namespace NFq {
+
+using namespace NYdb;
+
+class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> {
+public:
+ TImpl(
+ std::shared_ptr<TGRpcConnectionsImpl>&& connections,
+ const TCommonClientSettings& settings,
+ const NMonitoring::TDynamicCounterPtr& counters)
+ : TClientImplCommon(std::move(connections), settings)
+ , Counters(counters->GetSubgroup("subsystem", "private_api")->GetSubgroup("subcomponent", "ClientMetrics"))
+ , PingTaskTime(Counters->GetHistogram("PingTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50)))
+ , GetTaskTime(Counters->GetHistogram("GetTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50)))
+ , WriteTaskResultTime(Counters->GetHistogram("WriteTaskResultMs", NMonitoring::ExponentialHistogram(10, 2, 50)))
+ , NodesHealthCheckTime(Counters->GetHistogram("NodesHealthCheckMs", NMonitoring::ExponentialHistogram(10, 2, 50)))
+ {}
+
+ template<class TProtoResult, class TResultWrapper>
+ auto MakeResultExtractor(NThreading::TPromise<TResultWrapper> promise, const NMonitoring::THistogramPtr& hist, TInstant startedAt) {
+ return [=, promise = std::move(promise)]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ std::unique_ptr<TProtoResult> result;
+ if (any) {
+ result.reset(new TProtoResult);
+ any->UnpackTo(result.get());
+ }
+
+ hist->Collect((TInstant::Now() - startedAt).MilliSeconds());
+
+ promise.SetValue(
+ TResultWrapper(
+ TStatus(std::move(status)),
+ std::move(result)));
+ };
+ }
+
+ TAsyncPingTaskResult PingTask(
+ Fq::Private::PingTaskRequest&& request,
+ const TPingTaskSettings& settings) {
+ const auto startedAt = TInstant::Now();
+ auto promise = NThreading::NewPromise<TPingTaskResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ Fq::Private::PingTaskResult,
+ TPingTaskResult>(std::move(promise), PingTaskTime, startedAt);
+
+ Connections_->RunDeferred<
+ Fq::Private::V1::FqPrivateTaskService,
+ Fq::Private::PingTaskRequest,
+ Fq::Private::PingTaskResponse>(
+ std::move(request),
+ std::move(extractor),
+ &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncPingTask,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncGetTaskResult GetTask(
+ Fq::Private::GetTaskRequest&& request,
+ const TGetTaskSettings& settings) {
+ const auto startedAt = TInstant::Now();
+ auto promise = NThreading::NewPromise<TGetTaskResult>();
+ auto future = promise.GetFuture();
+ auto extractor = MakeResultExtractor<
+ Fq::Private::GetTaskResult,
+ TGetTaskResult>(std::move(promise), GetTaskTime, startedAt);
+
+ Connections_->RunDeferred<
+ Fq::Private::V1::FqPrivateTaskService,
+ Fq::Private::GetTaskRequest,
+ Fq::Private::GetTaskResponse>(
+ std::move(request),
+ std::move(extractor),
+ &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncGetTask,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncWriteTaskResult WriteTaskResult(
+ Fq::Private::WriteTaskResultRequest&& request,
+ const TWriteTaskResultSettings& settings) {
+ const auto startedAt = TInstant::Now();
+ auto promise = NThreading::NewPromise<TWriteTaskResult>();
+ auto future = promise.GetFuture();
+ auto extractor = MakeResultExtractor<
+ Fq::Private::WriteTaskResultResult,
+ TWriteTaskResult>(std::move(promise), WriteTaskResultTime, startedAt);
+
+ Connections_->RunDeferred<
+ Fq::Private::V1::FqPrivateTaskService,
+ Fq::Private::WriteTaskResultRequest,
+ Fq::Private::WriteTaskResultResponse>(
+ std::move(request),
+ std::move(extractor),
+ &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncWriteTaskResult,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncNodesHealthCheckResult NodesHealthCheck(
+ Fq::Private::NodesHealthCheckRequest&& request,
+ const TNodesHealthCheckSettings& settings) {
+ const auto startedAt = TInstant::Now();
+ auto promise = NThreading::NewPromise<TNodesHealthCheckResult>();
+ auto future = promise.GetFuture();
+ auto extractor = MakeResultExtractor<
+ Fq::Private::NodesHealthCheckResult,
+ TNodesHealthCheckResult>(std::move(promise), NodesHealthCheckTime, startedAt);
+
+ Connections_->RunDeferred<
+ Fq::Private::V1::FqPrivateTaskService,
+ Fq::Private::NodesHealthCheckRequest,
+ Fq::Private::NodesHealthCheckResponse>(
+ std::move(request),
+ std::move(extractor),
+ &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncNodesHealthCheck,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+private:
+ const NMonitoring::TDynamicCounterPtr Counters;
+ const NMonitoring::THistogramPtr PingTaskTime;
+ const NMonitoring::THistogramPtr GetTaskTime;
+ const NMonitoring::THistogramPtr WriteTaskResultTime;
+ const NMonitoring::THistogramPtr NodesHealthCheckTime;
+};
+
+TPrivateClient::TPrivateClient(
+ const TDriver& driver,
+ const TCommonClientSettings& settings,
+ const NMonitoring::TDynamicCounterPtr& counters)
+ : Impl(new TImpl(CreateInternalInterface(driver), settings, counters))
+{}
+
+TAsyncPingTaskResult TPrivateClient::PingTask(
+ Fq::Private::PingTaskRequest&& request,
+ const TPingTaskSettings& settings) {
+ return Impl->PingTask(std::move(request), settings);
+}
+
+TAsyncGetTaskResult TPrivateClient::GetTask(
+ Fq::Private::GetTaskRequest&& request,
+ const TGetTaskSettings& settings) {
+ return Impl->GetTask(std::move(request), settings);
+}
+
+TAsyncWriteTaskResult TPrivateClient::WriteTaskResult(
+ Fq::Private::WriteTaskResultRequest&& request,
+ const TWriteTaskResultSettings& settings) {
+ return Impl->WriteTaskResult(std::move(request), settings);
+}
+
+TAsyncNodesHealthCheckResult TPrivateClient::NodesHealthCheck(
+ Fq::Private::NodesHealthCheckRequest&& request,
+ const TNodesHealthCheckSettings& settings) {
+ return Impl->NodesHealthCheck(std::move(request), settings);
+}
+
+} //NFq
diff --git a/ydb/core/yq/libs/private_client/private_client_fq.h b/ydb/core/yq/libs/private_client/private_client_fq.h
new file mode 100644
index 00000000000..2333821cf35
--- /dev/null
+++ b/ydb/core/yq/libs/private_client/private_client_fq.h
@@ -0,0 +1,82 @@
+#pragma once
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h>
+#include <ydb/core/yq/libs/protos/fq_private.pb.h>
+
+namespace NFq {
+
+template<class TProtoResult>
+class TProtoResultInternalWrapper : public NYdb::TStatus {
+ friend class TPrivateClient;
+
+public:
+ TProtoResultInternalWrapper(
+ NYdb::TStatus&& status,
+ std::unique_ptr<TProtoResult> result)
+ : TStatus(std::move(status))
+ , Result(std::move(result))
+ { }
+
+public:
+ const TProtoResult& GetResult() const {
+ Y_VERIFY(Result, "Uninitialized result");
+ return *Result;
+ }
+
+ bool IsResultSet() const {
+ return Result ? true : false;
+ }
+
+private:
+ std::unique_ptr<TProtoResult> Result;
+};
+
+
+using TGetTaskResult = TProtoResultInternalWrapper<Fq::Private::GetTaskResult>;
+using TPingTaskResult = TProtoResultInternalWrapper<Fq::Private::PingTaskResult>;
+using TWriteTaskResult = TProtoResultInternalWrapper<Fq::Private::WriteTaskResultResult>;
+using TNodesHealthCheckResult = TProtoResultInternalWrapper<Fq::Private::NodesHealthCheckResult>;
+
+using TAsyncGetTaskResult = NThreading::TFuture<TGetTaskResult>;
+using TAsyncPingTaskResult = NThreading::TFuture<TPingTaskResult>;
+using TAsyncWriteTaskResult = NThreading::TFuture<TWriteTaskResult>;
+using TAsyncNodesHealthCheckResult = NThreading::TFuture<TNodesHealthCheckResult>;
+
+struct TGetTaskSettings : public NYdb::TOperationRequestSettings<TGetTaskSettings> {};
+struct TPingTaskSettings : public NYdb::TOperationRequestSettings<TPingTaskSettings> {};
+struct TWriteTaskResultSettings : public NYdb::TOperationRequestSettings<TWriteTaskResultSettings> {};
+struct TNodesHealthCheckSettings : public NYdb::TOperationRequestSettings<TNodesHealthCheckSettings> {};
+
+class TPrivateClient {
+ class TImpl;
+
+public:
+ TPrivateClient(
+ const NYdb::TDriver& driver,
+ const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings(),
+ const NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<NMonitoring::TDynamicCounters>());
+
+ TAsyncGetTaskResult GetTask(
+ Fq::Private::GetTaskRequest&& request,
+ const TGetTaskSettings& settings = TGetTaskSettings());
+
+ TAsyncPingTaskResult PingTask(
+ Fq::Private::PingTaskRequest&& request,
+ const TPingTaskSettings& settings = TPingTaskSettings());
+
+ TAsyncWriteTaskResult WriteTaskResult(
+ Fq::Private::WriteTaskResultRequest&& request,
+ const TWriteTaskResultSettings& settings = TWriteTaskResultSettings());
+
+ TAsyncNodesHealthCheckResult NodesHealthCheck(
+ Fq::Private::NodesHealthCheckRequest&& request,
+ const TNodesHealthCheckSettings& settings = TNodesHealthCheckSettings());
+
+private:
+ std::shared_ptr<TImpl> Impl;
+};
+
+} // namespace NFq
diff --git a/ydb/core/yq/libs/protos/CMakeLists.txt b/ydb/core/yq/libs/protos/CMakeLists.txt
index 8761fc37f06..2d6538dedff 100644
--- a/ydb/core/yq/libs/protos/CMakeLists.txt
+++ b/ydb/core/yq/libs/protos/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(yq-libs-protos PUBLIC
contrib-libs-protobuf
)
target_proto_messages(yq-libs-protos PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/fq_private.proto
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/yq_private.proto
)
target_proto_addincls(yq-libs-protos
diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto
new file mode 100644
index 00000000000..95f486ae834
--- /dev/null
+++ b/ydb/core/yq/libs/protos/fq_private.proto
@@ -0,0 +1,168 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package Fq.Private;
+
+import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto";
+
+import "ydb/public/api/protos/ydb_operation.proto";
+import "ydb/public/api/protos/ydb_value.proto";
+import "ydb/public/api/protos/ydb_issue_message.proto";
+import "ydb/public/api/protos/annotations/validation.proto";
+
+import "ydb/public/api/protos/fq.proto";
+
+import "google/protobuf/timestamp.proto";
+
+////////////////////////////////////////////////////////////
+
+message GetTaskRequest {
+ string tenant = 1;
+ string owner_id = 2; // guid, should be refreshed on node restart
+ string host = 3;
+ Ydb.Operations.OperationParams operation_params = 4;
+
+ message Filter {
+ string query_executor = 1 [(Ydb.length).le = 100];
+ }
+ Filter filter = 5;
+}
+
+message SignedIdentity {
+ string value = 1;
+ string signature = 2;
+}
+
+message TopicConsumer {
+ string database_id = 1;
+ string database = 2;
+ string topic_path = 3;
+ string consumer_name = 4;
+ string cluster_endpoint = 5;
+ bool use_ssl = 6;
+ string token_name = 7;
+ bool add_bearer_to_token = 8;
+}
+
+message GetTaskResult {
+ message Task {
+ // come back later in 10 sec ?
+ SignedIdentity result_id = 1;
+ SignedIdentity query_id = 2;
+ SignedIdentity job_id = 3;
+ uint64 generation = 4;
+
+ bool streaming = 5;
+ repeated bytes dq_graph = 6;
+ // text, connection and binding are empty if dq_graph is not empty
+ string text = 7;
+ repeated FederatedQuery.Connection connection = 8;
+ repeated FederatedQuery.Binding binding = 9;
+
+ string user_token = 10; // IAM token for debug
+ repeated SignedIdentity service_accounts = 11;
+ string user_id = 12;
+ FederatedQuery.QueryContent.QueryType query_type = 13;
+ string scope = 14;
+ FederatedQuery.ExecuteMode execute_mode = 15;
+ FederatedQuery.StateLoadMode state_load_mode = 16;
+ FederatedQuery.QueryMeta.ComputeStatus status = 17;
+ repeated FederatedQuery.ResultSetMeta result_set_meta = 18;
+ repeated TopicConsumer created_topic_consumers = 19;
+ int32 dq_graph_index = 20;
+ map<string, string> sensor_labels = 21;
+
+ bool automatic = 22;
+ string query_name = 23;
+ google.protobuf.Timestamp deadline = 24;
+ FederatedQuery.StreamingDisposition disposition = 25;
+ uint64 result_limit = 26;
+ }
+ repeated Task tasks = 1;
+}
+
+message GetTaskResponse {
+ Ydb.Operations.Operation operation = 1; // GetTaskResult
+}
+
+message PingTaskRequest {
+ string owner_id = 1;
+ SignedIdentity query_id = 2;
+ SignedIdentity job_id = 3;
+ SignedIdentity result_id = 4;
+ FederatedQuery.QueryMeta.ComputeStatus status = 5;
+ NYql.NDqProto.StatusIds.StatusCode status_code = 21;
+ repeated Ydb.Issue.IssueMessage issues = 6;
+ repeated Ydb.Issue.IssueMessage transient_issues = 16;
+ uint32 result_set_count = 7;
+ string statistics = 8;
+ repeated FederatedQuery.ResultSetMeta result_set_meta = 9;
+ string executer_info = 10;
+ repeated bytes dq_graph = 11;
+ int32 dq_graph_index = 20;
+ string ast = 12;
+ string plan = 13;
+ bool resign_query = 14;
+ repeated TopicConsumer created_topic_consumers = 17;
+ FederatedQuery.StateLoadMode state_load_mode = 18;
+ FederatedQuery.StreamingDisposition disposition = 19;
+ Ydb.Operations.OperationParams operation_params = 15;
+ string scope = 100;
+ string tenant = 104;
+ google.protobuf.Timestamp started_at = 101;
+ google.protobuf.Timestamp finished_at = 102;
+ google.protobuf.Timestamp deadline = 103;
+}
+
+message PingTaskResult {
+ FederatedQuery.QueryAction action = 1;
+}
+
+message PingTaskResponse {
+ Ydb.Operations.Operation operation = 1; // PingTaskResult
+}
+
+message WriteTaskResultRequest {
+ string owner_id = 1;
+ SignedIdentity result_id = 2;
+ Ydb.ResultSet result_set = 3;
+ uint32 result_set_id = 4;
+ uint64 offset = 5;
+ uint64 request_id = 6;
+ Ydb.Operations.OperationParams operation_params = 7;
+ google.protobuf.Timestamp deadline = 8;
+}
+
+message WriteTaskResultResult {
+ uint64 request_id = 1;
+}
+
+message WriteTaskResultResponse {
+ Ydb.Operations.Operation operation = 1; // WriteRowsResultResult
+}
+
+message NodeInfo {
+ uint32 node_id = 1;
+ string instance_id = 2;
+ string hostname = 3;
+ uint64 active_workers = 4;
+ uint64 memory_limit = 5;
+ uint64 memory_allocated = 6;
+ uint32 interconnect_port = 7;
+ string node_address = 8;
+ string data_center = 9;
+}
+
+message NodesHealthCheckRequest {
+ string tenant = 1;
+ NodeInfo node = 2;
+ Ydb.Operations.OperationParams operation_params = 6;
+}
+
+message NodesHealthCheckResult {
+ repeated NodeInfo nodes = 1;
+}
+
+message NodesHealthCheckResponse {
+ Ydb.Operations.Operation operation = 1; // NodesHealthCheckResult
+}
diff --git a/ydb/public/api/grpc/CMakeLists.txt b/ydb/public/api/grpc/CMakeLists.txt
index f3f8fd2ca09..df5495b4370 100644
--- a/ydb/public/api/grpc/CMakeLists.txt
+++ b/ydb/public/api/grpc/CMakeLists.txt
@@ -19,14 +19,15 @@ target_link_libraries(api-grpc PUBLIC
contrib-libs-protobuf
)
target_proto_messages(api-grpc PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/fq_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_auth_v1.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_cms_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_coordination_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_discovery_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_export_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_import_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_monitoring_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_operation_v1.proto
- ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_cms_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_rate_limiter_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_scheme_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/ydb_scripting_v1.proto
diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt
index 2093014f616..f048ca35d81 100644
--- a/ydb/public/api/grpc/draft/CMakeLists.txt
+++ b/ydb/public/api/grpc/draft/CMakeLists.txt
@@ -29,6 +29,7 @@ target_proto_messages(api-grpc-draft PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1_fq.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1.proto
)
target_proto_addincls(api-grpc-draft
diff --git a/ydb/public/api/grpc/draft/yql_db_v1_fq.proto b/ydb/public/api/grpc/draft/yql_db_v1_fq.proto
new file mode 100644
index 00000000000..cc403e93a43
--- /dev/null
+++ b/ydb/public/api/grpc/draft/yql_db_v1_fq.proto
@@ -0,0 +1,19 @@
+syntax = "proto3";
+
+package Fq.Private.V1;
+
+import "ydb/core/yq/libs/protos/fq_private.proto";
+
+service FqPrivateTaskService {
+ // gets new task
+ rpc GetTask(Fq.Private.GetTaskRequest) returns (Fq.Private.GetTaskResponse);
+
+ // pings new task (also can update metadata)
+ rpc PingTask(Fq.Private.PingTaskRequest) returns (Fq.Private.PingTaskResponse);
+
+ // writes rows
+ rpc WriteTaskResult(Fq.Private.WriteTaskResultRequest) returns (Fq.Private.WriteTaskResultResponse);
+
+ //Nodes
+ rpc NodesHealthCheck(Fq.Private.NodesHealthCheckRequest) returns (Fq.Private.NodesHealthCheckResponse);
+}
diff --git a/ydb/public/api/grpc/fq_v1.proto b/ydb/public/api/grpc/fq_v1.proto
new file mode 100644
index 00000000000..0b52f8bc037
--- /dev/null
+++ b/ydb/public/api/grpc/fq_v1.proto
@@ -0,0 +1,79 @@
+syntax = "proto3";
+
+package FederatedQuery.V1;
+option java_package = "com.federated.query.v1";
+
+import "ydb/public/api/protos/fq.proto";
+
+service FederatedQueryService {
+ // Query
+ // Query is the text of an SQL request, the results of the last run and the state after the last run (partitions offsets, consumer in YDS)
+ // Create a query object with a given SQL
+ rpc CreateQuery(FederatedQuery.CreateQueryRequest) returns (FederatedQuery.CreateQueryResponse);
+
+ // Get a list of brief queries objects
+ rpc ListQueries(FederatedQuery.ListQueriesRequest) returns (FederatedQuery.ListQueriesResponse);
+
+ // Get full information about the object of the query
+ rpc DescribeQuery(FederatedQuery.DescribeQueryRequest) returns (FederatedQuery.DescribeQueryResponse);
+
+ // Get status of the query
+ rpc GetQueryStatus(FederatedQuery.GetQueryStatusRequest) returns (FederatedQuery.GetQueryStatusResponse);
+
+ // Change the attributes of the query (acl, name, ...)
+ rpc ModifyQuery(FederatedQuery.ModifyQueryRequest) returns (FederatedQuery.ModifyQueryResponse);
+
+ // Completely delete the query
+ rpc DeleteQuery(FederatedQuery.DeleteQueryRequest) returns (FederatedQuery.DeleteQueryResponse);
+
+ // Change the state of the query lifecycle
+ rpc ControlQuery(FederatedQuery.ControlQueryRequest) returns (FederatedQuery.ControlQueryResponse);
+
+ // Get a results page
+ rpc GetResultData(FederatedQuery.GetResultDataRequest) returns (FederatedQuery.GetResultDataResponse);
+
+ // Job
+ // Job - appears immediately after starting the request and contains the request metadata
+ // Get a list of jobs
+ rpc ListJobs(FederatedQuery.ListJobsRequest) returns (FederatedQuery.ListJobsResponse);
+
+ // Get information about the job
+ rpc DescribeJob(FederatedQuery.DescribeJobRequest) returns (FederatedQuery.DescribeJobResponse);
+
+ // Connection
+ // Connection - entity that describes connection points. This can be imagined as an analogue of a network address.
+ // Create a connection object (ObjectStorage, YDB, YDS, ...)
+ rpc CreateConnection(FederatedQuery.CreateConnectionRequest) returns (FederatedQuery.CreateConnectionResponse);
+
+ // Get a list of connections objects
+ rpc ListConnections(FederatedQuery.ListConnectionsRequest) returns (FederatedQuery.ListConnectionsResponse);
+
+ // Get information about the object of the connection
+ rpc DescribeConnection(FederatedQuery.DescribeConnectionRequest) returns (FederatedQuery.DescribeConnectionResponse);
+
+ // Change the attributes of the connection
+ rpc ModifyConnection(FederatedQuery.ModifyConnectionRequest) returns (FederatedQuery.ModifyConnectionResponse);
+
+ // Completely delete the connection
+ rpc DeleteConnection(FederatedQuery.DeleteConnectionRequest) returns (FederatedQuery.DeleteConnectionResponse);
+
+ // Test the connection (permissions, network, ...)
+ rpc TestConnection(FederatedQuery.TestConnectionRequest) returns (FederatedQuery.TestConnectionResponse);
+
+ // Binding
+ // Binding - entity using which a schema is assigned to non-schematic data
+ // Create a binding object - bind schema with ObjectStorage object or YDS stream
+ rpc CreateBinding(FederatedQuery.CreateBindingRequest) returns (FederatedQuery.CreateBindingResponse);
+
+ // Get a list of bindings objects
+ rpc ListBindings(FederatedQuery.ListBindingsRequest) returns (FederatedQuery.ListBindingsResponse);
+
+ // Get information about the object of the binding
+ rpc DescribeBinding(FederatedQuery.DescribeBindingRequest) returns (FederatedQuery.DescribeBindingResponse);
+
+ // Change the attributes of the binding
+ rpc ModifyBinding(FederatedQuery.ModifyBindingRequest) returns (FederatedQuery.ModifyBindingResponse);
+
+ // Completely delete the binding
+ rpc DeleteBinding(FederatedQuery.DeleteBindingRequest) returns (FederatedQuery.DeleteBindingResponse);
+}
diff --git a/ydb/public/api/protos/CMakeLists.txt b/ydb/public/api/protos/CMakeLists.txt
index 797619ffbd7..98e03a01ab5 100644
--- a/ydb/public/api/protos/CMakeLists.txt
+++ b/ydb/public/api/protos/CMakeLists.txt
@@ -21,6 +21,7 @@ target_proto_messages(api-protos PRIVATE
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/persqueue_error_codes.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_long_tx.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/draft/ydb_logstore.proto
+ ${CMAKE_SOURCE_DIR}/ydb/public/api/protos/fq.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/persqueue_error_codes_v1.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_auth.proto
${CMAKE_SOURCE_DIR}/ydb/public/api/protos/ydb_persqueue_v1.proto
diff --git a/ydb/public/api/protos/fq.proto b/ydb/public/api/protos/fq.proto
new file mode 100644
index 00000000000..2e08f76543c
--- /dev/null
+++ b/ydb/public/api/protos/fq.proto
@@ -0,0 +1,743 @@
+syntax = "proto3";
+option cc_enable_arenas = true;
+
+package FederatedQuery;
+option java_package = "com.yandex.query";
+option java_outer_classname = "FederatedQueryProtos";
+
+import "ydb/public/api/protos/annotations/sensitive.proto";
+import "ydb/public/api/protos/annotations/validation.proto";
+import "ydb/public/api/protos/ydb_operation.proto";
+import "ydb/public/api/protos/ydb_value.proto";
+import "ydb/public/api/protos/ydb_issue_message.proto";
+
+import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
+import "google/protobuf/empty.proto";
+
+////////////////////////////////////////////////////////////
+
+// === Query API ===
+
+// Header: ydb-fq-project => yandexcloud://cloud_id/folder_id
+
+message Acl {
+ enum Visibility {
+ VISIBILITY_UNSPECIFIED = 0;
+ PRIVATE = 1; // Visibility only for the creator of the entity
+ SCOPE = 2; // Visibility for subjects within scope
+ }
+ Visibility visibility = 1;
+}
+
+message Limits {
+ // Used only for streaming queries
+ int64 vcpu_rate_limit = 1 [(Ydb.value) = ">= 0"]; // 0.01 vcpu per second
+ int64 flow_rate_limit = 2 [(Ydb.value) = ">= 0"]; // Bytes per second
+ int64 vcpu_time_limit = 3 [(Ydb.value) = ">= 0"]; // Milliseconds per second
+
+ // Used only for analytics queries
+ int64 max_result_size = 4 [(Ydb.value) = ">= 0"]; // Bytes
+ int64 max_result_rows = 5 [(Ydb.value) = ">= 0"]; // Count
+
+ // Common limits
+ int64 memory_limit = 6 [(Ydb.value) = ">= 0"]; // Bytes
+ google.protobuf.Duration result_ttl = 7;
+}
+
+enum ExecuteMode {
+ EXECUTE_MODE_UNSPECIFIED = 0;
+ SAVE = 1; // Save a query without changing its state
+ PARSE = 2; // Parse the query
+ COMPILE = 3; // Parse and compile the query
+ VALIDATE = 4; // Parse, compile and validate the query
+ EXPLAIN = 5; // High-level query plan that specifies only physical operations and non-temporary table names
+ RUN = 6; // Do all the previous + execution of the query
+}
+
+enum QueryAction {
+ QUERY_ACTION_UNSPECIFIED = 0;
+ PAUSE = 1; // Pause the query, with the possibility of its quick resumption
+ PAUSE_GRACEFULLY = 2; // Similar to PAUSE, only suspends the query allowing it to pause in checkpoint. Can work for a long time
+ ABORT = 3; // Stop the query
+ ABORT_GRACEFULLY = 4; // Similar to ABORT, only stops the query in checkpoint
+ RESUME = 5; // Resumes the execution of the query. Works only for PAUSE queries
+}
+
+enum StateLoadMode {
+ STATE_LOAD_MODE_UNSPECIFIED = 0;
+ EMPTY = 1; // Start the query with an empty state
+ FROM_LAST_CHECKPOINT = 2; // Start the query with the state that is saved in the last checkpoint
+}
+
+// For streaming queries only
+message StreamingDisposition {
+ message FromTime {
+ google.protobuf.Timestamp timestamp = 1;
+ }
+
+ message TimeAgo {
+ google.protobuf.Duration duration = 1;
+ }
+
+ message FromLastCheckpoint {
+ // By default if new query streams set doesn't equal to old query streams set,
+ // error will occur and query won't be allowed to load offsets for streams for the last checkpoint.
+ // If this flag is set all offsets that can be matched with previous query checkpoint will be matched.
+ // Others will use "fresh" streaming disposition.
+ bool force = 1;
+ }
+
+ oneof disposition {
+ google.protobuf.Empty oldest = 1; // Start processing with the oldest offset
+ google.protobuf.Empty fresh = 2; // Start processing with the fresh offset
+ FromTime from_time = 3; // Start processing with offset from the specified time
+ TimeAgo time_ago = 4; // Start processing with offset some time ago
+ FromLastCheckpoint from_last_checkpoint = 5; // Start processing with offset which corresponds to the last checkpoint
+ }
+}
+
+// Query information that the subject can change
+message QueryContent {
+ enum QueryType {
+ QUERY_TYPE_UNSPECIFIED = 0;
+ ANALYTICS = 1; // Analytical query (used for analytical data processing for example to work with YDB, ClickHouse, ...)
+ STREAMING = 2; // Streaming query (used for streaming data processing, such as working with YDS)
+ }
+ QueryType type = 1;
+ string name = 2 [(Ydb.length).le = 1024];
+ Acl acl = 3;
+ Limits limits = 4;
+ string text = 5 [(Ydb.length).le = 102400]; // The text of the query itself
+ bool automatic = 6; // Is used for queries that are created by automatic systems (robots, jdbc driver, ...)
+ string description = 7 [(Ydb.length).le = 10240]; // Description of the query, there can be any text
+ // Specified settings for query's executor
+ // Well known settings are:
+ // "executor" - type of executor for this query
+ map<string, string> execution_settings = 10 [(Ydb.map_key).length.range = {min: 1, max: 100}, (Ydb.length).le = 4096];
+}
+
+message CommonMeta {
+ string id = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ string created_by = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ string modified_by = 3 [(Ydb.length).range = {min: 1, max: 1024}];
+ google.protobuf.Timestamp created_at = 4;
+ google.protobuf.Timestamp modified_at = 5;
+ int64 revision = 6 [(Ydb.value) = ">= 0"]; // Entity version, increases with each change
+}
+
+message QueryMeta {
+ enum ComputeStatus {
+ COMPUTE_STATUS_UNSPECIFIED = 0;
+ STARTING = 1; // Start execution of the action on query
+ ABORTED_BY_USER = 2; // Query aborted by user
+ ABORTED_BY_SYSTEM = 3; // Query aborted by system
+ ABORTING_BY_USER = 4; // Query aborting by user
+ ABORTING_BY_SYSTEM = 5; // Query aborting by system
+ RESUMING = 6; // Resuming query execution from PAUSED status
+ RUNNING = 7; // Query started for execution
+ COMPLETED = 8; // Query completed successfully
+ COMPLETING = 12; // Finalizing query before become COMPLETED
+ FAILED = 9; // Query completed with errors
+ FAILING = 13; // Finalizing query before become FAILED
+ PAUSED = 11; // Query paused
+ PAUSING = 10; // Query starts pausing
+ }
+
+ CommonMeta common = 1;
+ google.protobuf.Timestamp started_at = 2;
+ google.protobuf.Timestamp finished_at = 3;
+ ExecuteMode execute_mode = 4;
+ ComputeStatus status = 5;
+ int64 last_job_query_revision = 6;
+ string last_job_id = 7;
+ google.protobuf.Timestamp expire_at = 8;
+ google.protobuf.Timestamp result_expire_at = 9;
+ string started_by = 10;
+ oneof action {
+ string aborted_by = 11;
+ string paused_by = 12;
+ }
+ // One of the versions of this query has fully saved checkpoint.
+ // If this flag is not set streaming disposition mode "from last checkpoint" can't be used.
+ bool has_saved_checkpoints = 13;
+}
+
+message BriefQuery {
+ QueryContent.QueryType type = 1;
+ string name = 2 [(Ydb.length).le = 1024];
+ QueryMeta meta = 3;
+ Acl.Visibility visibility = 4;
+ bool automatic = 5;
+}
+
+message QueryPlan {
+ string json = 1; // No validation because generated on server side
+}
+
+message QueryAst {
+ string data = 1;
+}
+
+message ResultSetMeta {
+ repeated Ydb.Column column = 1;
+ int64 rows_count = 2 [(Ydb.value) = ">= 0"];
+ bool truncated = 3;
+}
+
+message Query {
+ QueryMeta meta = 1;
+ QueryContent content = 2;
+ QueryPlan plan = 3;
+ repeated Ydb.Issue.IssueMessage issue = 4;
+ repeated Ydb.Issue.IssueMessage transient_issue = 5;
+ QueryStatistics statistics = 6;
+ repeated ResultSetMeta result_set_meta = 7;
+ QueryAst ast = 8;
+}
+
+message QueryStatistics {
+ string json = 1; // No validation because generated on server side
+}
+
+// Create a new query
+message CreateQueryRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ QueryContent content = 2;
+ ExecuteMode execute_mode = 3;
+ StreamingDisposition disposition = 4;
+ string idempotency_key = 5 [(Ydb.length).le = 1024];
+}
+
+message CreateQueryResponse {
+ Ydb.Operations.Operation operation = 1; // CreateQueryResult
+}
+
+message CreateQueryResult {
+ string query_id = 1 [(Ydb.length).le = 1024];
+}
+
+enum AutomaticType {
+ AUTOMATIC_TYPE_UNSPECIFIED = 0;
+ AUTOMATIC = 1;
+ NOT_AUTOMATIC = 2;
+}
+
+// Getting brief information about queries
+message ListQueriesRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string page_token = 2 [(Ydb.length).le = 1024];
+ int32 limit = 3 [(Ydb.value) = "[1; 100]"];
+
+ message Filter {
+ QueryContent.QueryType query_type = 1;
+ repeated QueryMeta.ComputeStatus status = 2 [(Ydb.size).le = 20];
+ repeated ExecuteMode mode = 3 [(Ydb.size).le = 20];
+ string name = 4 [(Ydb.length).le = 1024]; // queries whose name contains the filter.name substring
+ bool created_by_me = 5;
+ Acl.Visibility visibility = 6;
+ AutomaticType automatic = 7;
+ }
+ Filter filter = 4;
+}
+
+message ListQueriesResponse {
+ Ydb.Operations.Operation operation = 1; // ListQueriesResult
+}
+
+message ListQueriesResult {
+ repeated BriefQuery query = 1;
+ string next_page_token = 2 [(Ydb.length).le = 1024];
+}
+
+// Getting complete information about the query
+message DescribeQueryRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+message DescribeQueryResponse {
+ Ydb.Operations.Operation operation = 1; // DescribeQueryResult
+}
+
+message DescribeQueryResult {
+ Query query = 1;
+}
+
+// Getting status of the query
+message GetQueryStatusRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+message GetQueryStatusResponse {
+ Ydb.Operations.Operation operation = 1; // GetQueryStatusResult
+}
+
+message GetQueryStatusResult {
+ QueryMeta.ComputeStatus status = 1;
+ int64 meta_revision = 2;
+}
+
+// Complete removal of query. Recovery of the query after this operation is not possible
+message DeleteQueryRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ int64 previous_revision = 3 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 4 [(Ydb.length).le = 1024];
+}
+
+message DeleteQueryResponse {
+ Ydb.Operations.Operation operation = 1; // DeleteQueryResult
+}
+
+message DeleteQueryResult {
+}
+
+// Change query information with launch policy option. All fields must be filled in the request. The query changes completely
+message ModifyQueryRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ QueryContent content = 3;
+ ExecuteMode execute_mode = 4;
+ StreamingDisposition disposition = 5;
+ StateLoadMode state_load_mode = 6;
+ int64 previous_revision = 7 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 8 [(Ydb.length).le = 1024];
+}
+
+message ModifyQueryResponse {
+ Ydb.Operations.Operation operation = 1; // ModifyQueryResult
+}
+
+message ModifyQueryResult {
+}
+
+// Managing query status (pause, abort, resume, ...)
+message ControlQueryRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ QueryAction action = 3;
+ int64 previous_revision = 4 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 5 [(Ydb.length).le = 1024];
+}
+
+message ControlQueryResponse {
+ Ydb.Operations.Operation operation = 1; // ControlQueryResult
+}
+
+message ControlQueryResult {
+}
+
+// === Job API ===
+
+message BriefJob {
+ CommonMeta meta = 1;
+ QueryMeta query_meta = 3;
+ string query_name = 9;
+ Acl.Visibility visibility = 10;
+ bool automatic = 11;
+ google.protobuf.Timestamp expire_at = 12;
+}
+
+message Job {
+ CommonMeta meta = 1;
+ string text = 2;
+ QueryMeta query_meta = 3;
+ QueryPlan plan = 4;
+ repeated Ydb.Issue.IssueMessage issue = 5;
+ QueryStatistics statistics = 6;
+ repeated ResultSetMeta result_set_meta = 7;
+ QueryAst ast = 8;
+ string query_name = 9;
+ Acl acl = 10;
+ bool automatic = 11;
+ google.protobuf.Timestamp expire_at = 12;
+}
+
+// Information about recent query runs
+message ListJobsRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string page_token = 2 [(Ydb.length).le = 1024];
+ int32 limit = 3 [(Ydb.value) = "[1; 100]"];
+ string query_id = 5; // deprecated
+
+ message Filter {
+ string query_id = 1 [(Ydb.length).le = 1024];
+ bool created_by_me = 2;
+ }
+ Filter filter = 4;
+}
+
+message ListJobsResponse {
+ Ydb.Operations.Operation operation = 1; // ListJobsResult
+}
+
+message ListJobsResult {
+ repeated BriefJob job = 1;
+ string next_page_token = 2 [(Ydb.length).le = 1024];
+}
+
+// Getting information about the job
+message DescribeJobRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string job_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+message DescribeJobResponse {
+ Ydb.Operations.Operation operation = 1; // DescribeJobResult
+}
+
+message DescribeJobResult {
+ Job job = 1;
+}
+
+// === Connection API ===
+
+message CurrentIAMTokenAuth {
+}
+
+message NoneAuth {
+}
+
+message ServiceAccountAuth {
+ string id = 1 [(Ydb.length).le = 1024];
+}
+
+message IamAuth {
+ oneof identity {
+ CurrentIAMTokenAuth current_iam = 1;
+ ServiceAccountAuth service_account = 2;
+ NoneAuth none = 3;
+ }
+}
+
+message DataStreams {
+ string database_id = 1 [(Ydb.length).le = 1024];
+ IamAuth auth = 2;
+
+ // for internal usage
+ string endpoint = 3 [(Ydb.length).le = 1024];
+ string database = 4 [(Ydb.length).le = 1024];
+ bool secure = 5;
+}
+
+message Monitoring {
+ string project = 1 [(Ydb.length).le = 200];
+ string cluster = 2 [(Ydb.length).le = 200];
+ IamAuth auth = 3;
+}
+
+message YdbDatabase {
+ string database_id = 1 [(Ydb.length).le = 1024];
+ IamAuth auth = 2;
+
+ // for internal usage
+ string endpoint = 3 [(Ydb.length).le = 1024];
+ string database = 4 [(Ydb.length).le = 1024];
+ bool secure = 5;
+}
+
+message ClickHouseCluster {
+ string database_id = 1 [(Ydb.length).le = 1024];
+ string login = 2 [(Ydb.length).le = 1024, (Ydb.sensitive) = true];
+ string password = 3 [(Ydb.length).le = 1024, (Ydb.sensitive) = true];
+ IamAuth auth = 4;
+
+ // for internal usage
+ string host = 5 [(Ydb.length).le = 1024];
+ int32 port = 6 [(Ydb.value) = "[0; 65536]"];
+ bool secure = 7;
+}
+
+message ObjectStorageConnection {
+ string bucket = 1 [(Ydb.length).le = 1024];
+ IamAuth auth = 2;
+}
+
+message ConnectionSetting {
+ enum ConnectionType {
+ CONNECTION_TYPE_UNSPECIFIED = 0;
+ YDB_DATABASE = 1;
+ CLICKHOUSE_CLUSTER = 2;
+ DATA_STREAMS = 3;
+ OBJECT_STORAGE = 4;
+ MONITORING = 5;
+ }
+
+ oneof connection {
+ YdbDatabase ydb_database = 1;
+ ClickHouseCluster clickhouse_cluster = 2;
+ DataStreams data_streams = 3;
+ ObjectStorageConnection object_storage = 4;
+ Monitoring monitoring = 5;
+ }
+}
+
+message ConnectionContent {
+ string name = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ ConnectionSetting setting = 2;
+ Acl acl = 3;
+ string description = 4 [(Ydb.length).le = 10240];
+}
+
+message Connection {
+ ConnectionContent content = 1;
+ CommonMeta meta = 2;
+}
+
+// Create a new connection
+message CreateConnectionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ ConnectionContent content = 2;
+ string idempotency_key = 3 [(Ydb.length).le = 1024];
+}
+
+message CreateConnectionResponse {
+ Ydb.Operations.Operation operation = 1; // CreateConnectionResult
+}
+
+message CreateConnectionResult {
+ string connection_id = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+// Getting information about connections
+message ListConnectionsRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string page_token = 2 [(Ydb.length).le = 1024];
+ int32 limit = 3 [(Ydb.value) = "[1; 100]"];
+
+ message Filter {
+ string name = 1 [(Ydb.length).le = 1024]; // connections whose name contains the filter.name substring
+ bool created_by_me = 2;
+ ConnectionSetting.ConnectionType connection_type = 3;
+ }
+ Filter filter = 4;
+}
+
+message ListConnectionsResponse {
+ Ydb.Operations.Operation operation = 1; // ListConnectionsResult
+}
+
+message ListConnectionsResult {
+ repeated Connection connection = 1;
+ string next_page_token = 2 [(Ydb.length).le = 1024];
+}
+
+// Getting information about the connection
+message DescribeConnectionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+message DescribeConnectionResponse {
+ Ydb.Operations.Operation operation = 1; // DescribeConnectionResult
+}
+
+message DescribeConnectionResult {
+ Connection connection = 1;
+}
+
+// Change connection information. All fields must be filled in the request. The connection changes completely
+message ModifyConnectionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ ConnectionContent content = 3;
+ int64 previous_revision = 4 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 5 [(Ydb.length).le = 1024];
+}
+
+message ModifyConnectionResponse {
+ Ydb.Operations.Operation operation = 1; // ModifyConnectionResult
+}
+
+message ModifyConnectionResult {
+}
+
+// Complete removal of connection. Recovery of the connection after this operation is not possible
+message DeleteConnectionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ int64 previous_revision = 3 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 4 [(Ydb.length).le = 1024];
+}
+
+message DeleteConnectionResponse {
+ Ydb.Operations.Operation operation = 1; // DeleteConnectionResult
+}
+
+message DeleteConnectionResult {
+}
+
+message TestConnectionRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ ConnectionSetting setting = 2;
+}
+
+message TestConnectionResponse {
+ Ydb.Operations.Operation operation = 1; // TestConnectionResult
+}
+
+message TestConnectionResult {
+}
+
+// ResultSet API
+
+// Getting the result of the query execution
+message GetResultDataRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string query_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ int32 result_set_index = 3 [(Ydb.value) = ">= 0"];
+ int64 offset = 4 [(Ydb.value) = ">= 0"];
+ int64 limit = 5 [(Ydb.value) = "[1; 1000]"];
+}
+
+message GetResultDataResponse {
+ Ydb.Operations.Operation operation = 1; // GetResultDataResult
+}
+
+message GetResultDataResult {
+ Ydb.ResultSet result_set = 1;
+}
+
+// Binding API
+
+message Schema {
+ repeated Ydb.Column column = 1 [(Ydb.size).le = 100];
+}
+
+message DataStreamsBinding {
+ string stream_name = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ string format = 2 [(Ydb.length).le = 1024];
+ string compression = 3 [(Ydb.length).le = 1024];
+ Schema schema = 4;
+ map<string, string> format_setting = 5 [(Ydb.size).le = 100];
+}
+
+message ObjectStorageBinding {
+ message Subset {
+ string path_pattern = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ string format = 2 [(Ydb.length).le = 1024];
+ map<string, string> format_setting = 3 [(Ydb.size).le = 100];
+ string compression = 4 [(Ydb.length).le = 1024];
+ Schema schema = 5;
+ }
+
+ repeated Subset subset = 1;
+}
+
+message BindingSetting {
+ enum BindingType {
+ BINDING_TYPE_UNSPECIFIED = 0;
+ DATA_STREAMS = 1;
+ OBJECT_STORAGE = 2;
+ }
+
+ oneof binding {
+ DataStreamsBinding data_streams = 1;
+ ObjectStorageBinding object_storage = 2;
+ }
+}
+
+message BriefBinding {
+ string name = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ CommonMeta meta = 3;
+ BindingSetting.BindingType type = 4;
+}
+
+message BindingContent {
+ string name = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+ string connection_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ BindingSetting setting = 3;
+ Acl acl = 4;
+ string description = 5 [(Ydb.length).le = 10240];
+}
+
+message Binding {
+ BindingContent content = 1;
+ CommonMeta meta = 2;
+}
+
+// Create a new binding
+message CreateBindingRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ BindingContent content = 2;
+ string idempotency_key = 3 [(Ydb.length).le = 1024];
+}
+
+message CreateBindingResponse {
+ Ydb.Operations.Operation operation = 1; // CreateBindingResult
+}
+
+message CreateBindingResult {
+ string binding_id = 1 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+// Getting information about bindings
+message ListBindingsRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string page_token = 2 [(Ydb.length).le = 1024];
+ int32 limit = 3 [(Ydb.value) = "[1; 100]"];
+
+ message Filter {
+ string connection_id = 1 [(Ydb.length).le = 1024];
+ string name = 2 [(Ydb.length).le = 1024]; // bindings whose name contains the filter.name substring
+ bool created_by_me = 3;
+ }
+ Filter filter = 4;
+}
+
+message ListBindingsResponse {
+ Ydb.Operations.Operation operation = 1; // ListBindingsResult
+}
+
+message ListBindingsResult {
+ repeated BriefBinding binding = 1;
+ string next_page_token = 2 [(Ydb.length).le = 1024];
+}
+
+// Getting information about the binding
+message DescribeBindingRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+}
+
+message DescribeBindingResponse {
+ Ydb.Operations.Operation operation = 1; // DescribeBindingResult
+}
+
+message DescribeBindingResult {
+ Binding binding = 1;
+}
+
+// Change binding information. All fields must be filled in the request. The binding changes completely
+message ModifyBindingRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ BindingContent content = 3;
+ int64 previous_revision = 4 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 5 [(Ydb.length).le = 1024];
+}
+
+message ModifyBindingResponse {
+ Ydb.Operations.Operation operation = 1; // ModifyBindingResult
+}
+
+message ModifyBindingResult {
+}
+
+// Complete removal of binding. Recovery of the binding after this operation is not possible
+message DeleteBindingRequest {
+ Ydb.Operations.OperationParams operation_params = 1;
+ string binding_id = 2 [(Ydb.length).range = {min: 1, max: 1024}];
+ int64 previous_revision = 3 [(Ydb.value) = ">= 0"];
+ string idempotency_key = 4 [(Ydb.length).le = 1024];
+}
+
+message DeleteBindingResponse {
+ Ydb.Operations.Operation operation = 1; // DeleteBindingResult
+}
+
+message DeleteBindingResult {
+}
diff --git a/ydb/public/api/protos/yq.proto b/ydb/public/api/protos/yq.proto
index 9ecf9026959..82f83acddc9 100644
--- a/ydb/public/api/protos/yq.proto
+++ b/ydb/public/api/protos/yq.proto
@@ -1,3 +1,5 @@
+// DEPRECATED!! use fq.proto
+
syntax = "proto3";
option cc_enable_arenas = true;
diff --git a/ydb/public/lib/fq/CMakeLists.txt b/ydb/public/lib/fq/CMakeLists.txt
new file mode 100644
index 00000000000..06d407b7d9a
--- /dev/null
+++ b/ydb/public/lib/fq/CMakeLists.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(public-lib-fq)
+target_link_libraries(public-lib-fq PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-json
+ api-grpc-draft
+ cpp-client-ydb_table
+)
+target_sources(public-lib-fq PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/fq/fq.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/public/lib/fq/scope.cpp
+)
diff --git a/ydb/public/lib/fq/fq.cpp b/ydb/public/lib/fq/fq.cpp
new file mode 100644
index 00000000000..59c4d5cb22c
--- /dev/null
+++ b/ydb/public/lib/fq/fq.cpp
@@ -0,0 +1,754 @@
+#include "fq.h"
+
+#define INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
+#undef INCLUDE_YDB_INTERNAL_H
+
+#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
+
+namespace NYdb::NFq {
+
+using namespace NYdb;
+
+class TClient::TImpl : public TClientImplCommon<TClient::TImpl> {
+public:
+ TImpl(std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TCommonClientSettings& settings)
+ : TClientImplCommon(std::move(connections), settings) {}
+
+ template<class TProtoResult, class TResultWrapper>
+ auto MakeResultExtractor(NThreading::TPromise<TResultWrapper> promise) {
+ return [promise = std::move(promise)]
+ (google::protobuf::Any* any, TPlainStatus status) mutable {
+ std::unique_ptr<TProtoResult> result;
+ if (any) {
+ result.reset(new TProtoResult);
+ any->UnpackTo(result.get());
+ }
+
+ promise.SetValue(
+ TResultWrapper(
+ TStatus(std::move(status)),
+ std::move(result)));
+ };
+ }
+
+ TAsyncCreateQueryResult CreateQuery(
+ const FederatedQuery::CreateQueryRequest& protoRequest,
+ const TCreateQuerySettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::CreateQueryRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TCreateQueryResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::CreateQueryResult,
+ TCreateQueryResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::CreateQueryRequest,
+ FederatedQuery::CreateQueryResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncListQueriesResult ListQueries(
+ const FederatedQuery::ListQueriesRequest& protoRequest,
+ const TListQueriesSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ListQueriesRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TListQueriesResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ListQueriesResult,
+ TListQueriesResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ListQueriesRequest,
+ FederatedQuery::ListQueriesResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListQueries,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDescribeQueryResult DescribeQuery(
+ const FederatedQuery::DescribeQueryRequest& protoRequest,
+ const TDescribeQuerySettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DescribeQueryRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDescribeQueryResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DescribeQueryResult,
+ TDescribeQueryResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DescribeQueryRequest,
+ FederatedQuery::DescribeQueryResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncGetQueryStatusResult GetQueryStatus(
+ const FederatedQuery::GetQueryStatusRequest& protoRequest,
+ const TGetQueryStatusSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::GetQueryStatusRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TGetQueryStatusResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::GetQueryStatusResult,
+ TGetQueryStatusResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::GetQueryStatusRequest,
+ FederatedQuery::GetQueryStatusResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncGetQueryStatus,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncModifyQueryResult ModifyQuery(
+ const FederatedQuery::ModifyQueryRequest& protoRequest,
+ const TModifyQuerySettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ModifyQueryRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TModifyQueryResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ModifyQueryResult,
+ TModifyQueryResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ModifyQueryRequest,
+ FederatedQuery::ModifyQueryResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDeleteQueryResult DeleteQuery(
+ const FederatedQuery::DeleteQueryRequest& protoRequest,
+ const TDeleteQuerySettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DeleteQueryRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDeleteQueryResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DeleteQueryResult,
+ TDeleteQueryResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DeleteQueryRequest,
+ FederatedQuery::DeleteQueryResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncControlQueryResult ControlQuery(
+ const FederatedQuery::ControlQueryRequest& protoRequest,
+ const TControlQuerySettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ControlQueryRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TControlQueryResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ControlQueryResult,
+ TControlQueryResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ControlQueryRequest,
+ FederatedQuery::ControlQueryResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncControlQuery,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncGetResultDataResult GetResultData(
+ const FederatedQuery::GetResultDataRequest& protoRequest,
+ const TGetResultDataSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::GetResultDataRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TGetResultDataResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::GetResultDataResult,
+ TGetResultDataResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::GetResultDataRequest,
+ FederatedQuery::GetResultDataResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncGetResultData,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncListJobsResult ListJobs(
+ const FederatedQuery::ListJobsRequest& protoRequest,
+ const TListJobsSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ListJobsRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TListJobsResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ListJobsResult,
+ TListJobsResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ListJobsRequest,
+ FederatedQuery::ListJobsResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListJobs,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDescribeJobResult DescribeJob(
+ const FederatedQuery::DescribeJobRequest& protoRequest,
+ const TDescribeJobSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DescribeJobRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDescribeJobResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DescribeJobResult,
+ TDescribeJobResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DescribeJobRequest,
+ FederatedQuery::DescribeJobResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeJob,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncCreateConnectionResult CreateConnection(
+ const FederatedQuery::CreateConnectionRequest& protoRequest,
+ const TCreateConnectionSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::CreateConnectionRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TCreateConnectionResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::CreateConnectionResult,
+ TCreateConnectionResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::CreateConnectionRequest,
+ FederatedQuery::CreateConnectionResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateConnection,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncListConnectionsResult ListConnections(
+ const FederatedQuery::ListConnectionsRequest& protoRequest,
+ const TListConnectionsSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ListConnectionsRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TListConnectionsResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ListConnectionsResult,
+ TListConnectionsResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ListConnectionsRequest,
+ FederatedQuery::ListConnectionsResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListConnections,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDescribeConnectionResult DescribeConnection(
+ const FederatedQuery::DescribeConnectionRequest& protoRequest,
+ const TDescribeConnectionSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DescribeConnectionRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDescribeConnectionResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DescribeConnectionResult,
+ TDescribeConnectionResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DescribeConnectionRequest,
+ FederatedQuery::DescribeConnectionResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeConnection,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncModifyConnectionResult ModifyConnection(
+ const FederatedQuery::ModifyConnectionRequest& protoRequest,
+ const TModifyConnectionSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ModifyConnectionRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TModifyConnectionResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ModifyConnectionResult,
+ TModifyConnectionResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ModifyConnectionRequest,
+ FederatedQuery::ModifyConnectionResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyConnection,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDeleteConnectionResult DeleteConnection(
+ const FederatedQuery::DeleteConnectionRequest& protoRequest,
+ const TDeleteConnectionSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DeleteConnectionRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDeleteConnectionResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DeleteConnectionResult,
+ TDeleteConnectionResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DeleteConnectionRequest,
+ FederatedQuery::DeleteConnectionResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteConnection,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncTestConnectionResult TestConnection(
+ const FederatedQuery::TestConnectionRequest& protoRequest,
+ const TTestConnectionSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::TestConnectionRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TTestConnectionResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::TestConnectionResult,
+ TTestConnectionResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::TestConnectionRequest,
+ FederatedQuery::TestConnectionResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncTestConnection,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncCreateBindingResult CreateBinding(
+ const FederatedQuery::CreateBindingRequest& protoRequest,
+ const TCreateBindingSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::CreateBindingRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TCreateBindingResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::CreateBindingResult,
+ TCreateBindingResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::CreateBindingRequest,
+ FederatedQuery::CreateBindingResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncCreateBinding,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncListBindingsResult ListBindings(
+ const FederatedQuery::ListBindingsRequest& protoRequest,
+ const TListBindingsSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ListBindingsRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TListBindingsResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ListBindingsResult,
+ TListBindingsResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ListBindingsRequest,
+ FederatedQuery::ListBindingsResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncListBindings,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDescribeBindingResult DescribeBinding(
+ const FederatedQuery::DescribeBindingRequest& protoRequest,
+ const TDescribeBindingSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DescribeBindingRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDescribeBindingResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DescribeBindingResult,
+ TDescribeBindingResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DescribeBindingRequest,
+ FederatedQuery::DescribeBindingResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDescribeBinding,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncModifyBindingResult ModifyBinding(
+ const FederatedQuery::ModifyBindingRequest& protoRequest,
+ const TModifyBindingSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::ModifyBindingRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TModifyBindingResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::ModifyBindingResult,
+ TModifyBindingResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::ModifyBindingRequest,
+ FederatedQuery::ModifyBindingResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncModifyBinding,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+
+ TAsyncDeleteBindingResult DeleteBinding(
+ const FederatedQuery::DeleteBindingRequest& protoRequest,
+ const TDeleteBindingSettings& settings) {
+ auto request = MakeOperationRequest<FederatedQuery::DeleteBindingRequest>(settings);
+ request = protoRequest;
+
+ auto promise = NThreading::NewPromise<TDeleteBindingResult>();
+ auto future = promise.GetFuture();
+
+ auto extractor = MakeResultExtractor<
+ FederatedQuery::DeleteBindingResult,
+ TDeleteBindingResult>(std::move(promise));
+
+ Connections_->RunDeferred<
+ FederatedQuery::V1::FederatedQueryService,
+ FederatedQuery::DeleteBindingRequest,
+ FederatedQuery::DeleteBindingResponse>(
+ std::move(request),
+ std::move(extractor),
+ &FederatedQuery::V1::FederatedQueryService::Stub::AsyncDeleteBinding,
+ DbDriverState_,
+ INITIAL_DEFERRED_CALL_DELAY,
+ TRpcRequestSettings::Make(settings),
+ settings.ClientTimeout_);
+
+ return future;
+ }
+};
+
+TClient::TClient(const TDriver& driver, const TCommonClientSettings& settings)
+ : Impl_(new TImpl(CreateInternalInterface(driver), settings))
+{}
+
+TAsyncCreateQueryResult TClient::CreateQuery(
+ const FederatedQuery::CreateQueryRequest& request,
+ const TCreateQuerySettings& settings) {
+ return Impl_->CreateQuery(request, settings);
+}
+
+TAsyncListQueriesResult TClient::ListQueries(
+ const FederatedQuery::ListQueriesRequest& request,
+ const TListQueriesSettings& settings) {
+ return Impl_->ListQueries(request, settings);
+}
+
+TAsyncDescribeQueryResult TClient::DescribeQuery(
+ const FederatedQuery::DescribeQueryRequest& request,
+ const TDescribeQuerySettings& settings) {
+ return Impl_->DescribeQuery(request, settings);
+}
+
+TAsyncGetQueryStatusResult TClient::GetQueryStatus(
+ const FederatedQuery::GetQueryStatusRequest& request,
+ const TGetQueryStatusSettings& settings) {
+ return Impl_->GetQueryStatus(request, settings);
+}
+
+TAsyncModifyQueryResult TClient::ModifyQuery(
+ const FederatedQuery::ModifyQueryRequest& request,
+ const TModifyQuerySettings& settings) {
+ return Impl_->ModifyQuery(request, settings);
+}
+
+TAsyncDeleteQueryResult TClient::DeleteQuery(
+ const FederatedQuery::DeleteQueryRequest& request,
+ const TDeleteQuerySettings& settings) {
+ return Impl_->DeleteQuery(request, settings);
+}
+
+TAsyncControlQueryResult TClient::ControlQuery(
+ const FederatedQuery::ControlQueryRequest& request,
+ const TControlQuerySettings& settings) {
+ return Impl_->ControlQuery(request, settings);
+}
+
+TAsyncGetResultDataResult TClient::GetResultData(
+ const FederatedQuery::GetResultDataRequest& request,
+ const TGetResultDataSettings& settings) {
+ return Impl_->GetResultData(request, settings);
+}
+
+TAsyncListJobsResult TClient::ListJobs(
+ const FederatedQuery::ListJobsRequest& request,
+ const TListJobsSettings& settings) {
+ return Impl_->ListJobs(request, settings);
+}
+
+TAsyncDescribeJobResult TClient::DescribeJob(
+ const FederatedQuery::DescribeJobRequest& request,
+ const TDescribeJobSettings& settings) {
+ return Impl_->DescribeJob(request, settings);
+}
+
+TAsyncCreateConnectionResult TClient::CreateConnection(
+ const FederatedQuery::CreateConnectionRequest& request,
+ const TCreateConnectionSettings& settings) {
+ return Impl_->CreateConnection(request, settings);
+}
+
+TAsyncListConnectionsResult TClient::ListConnections(
+ const FederatedQuery::ListConnectionsRequest& request,
+ const TListConnectionsSettings& settings) {
+ return Impl_->ListConnections(request, settings);
+}
+
+TAsyncDescribeConnectionResult TClient::DescribeConnection(
+ const FederatedQuery::DescribeConnectionRequest& request,
+ const TDescribeConnectionSettings& settings) {
+ return Impl_->DescribeConnection(request, settings);
+}
+
+TAsyncModifyConnectionResult TClient::ModifyConnection(
+ const FederatedQuery::ModifyConnectionRequest& request,
+ const TModifyConnectionSettings& settings) {
+ return Impl_->ModifyConnection(request, settings);
+}
+
+TAsyncDeleteConnectionResult TClient::DeleteConnection(
+ const FederatedQuery::DeleteConnectionRequest& request,
+ const TDeleteConnectionSettings& settings) {
+ return Impl_->DeleteConnection(request, settings);
+}
+
+TAsyncTestConnectionResult TClient::TestConnection(
+ const FederatedQuery::TestConnectionRequest& request,
+ const TTestConnectionSettings& settings) {
+ return Impl_->TestConnection(request, settings);
+}
+
+TAsyncCreateBindingResult TClient::CreateBinding(
+ const FederatedQuery::CreateBindingRequest& request,
+ const TCreateBindingSettings& settings) {
+ return Impl_->CreateBinding(request, settings);
+}
+
+TAsyncListBindingsResult TClient::ListBindings(
+ const FederatedQuery::ListBindingsRequest& request,
+ const TListBindingsSettings& settings) {
+ return Impl_->ListBindings(request, settings);
+}
+
+TAsyncDescribeBindingResult TClient::DescribeBinding(
+ const FederatedQuery::DescribeBindingRequest& request,
+ const TDescribeBindingSettings& settings) {
+ return Impl_->DescribeBinding(request, settings);
+}
+
+TAsyncModifyBindingResult TClient::ModifyBinding(
+ const FederatedQuery::ModifyBindingRequest& request,
+ const TModifyBindingSettings& settings) {
+ return Impl_->ModifyBinding(request, settings);
+}
+
+TAsyncDeleteBindingResult TClient::DeleteBinding(
+ const FederatedQuery::DeleteBindingRequest& request,
+ const TDeleteBindingSettings& settings) {
+ return Impl_->DeleteBinding(request, settings);
+}
+
+}
diff --git a/ydb/public/lib/fq/fq.h b/ydb/public/lib/fq/fq.h
new file mode 100644
index 00000000000..e2d3405fbb6
--- /dev/null
+++ b/ydb/public/lib/fq/fq.h
@@ -0,0 +1,215 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <ydb/public/api/grpc/fq_v1.grpc.pb.h>
+
+namespace NYdb::NFq {
+
+template<class TProtoResult>
+class TProtoResultWrapper : public NYdb::TStatus {
+ friend class TClient;
+
+private:
+ TProtoResultWrapper(
+ NYdb::TStatus&& status,
+ std::unique_ptr<TProtoResult> result)
+ : TStatus(std::move(status))
+ , Result(std::move(result))
+ { }
+
+public:
+ const TProtoResult& GetResult() const {
+ if (!Result) {
+ ythrow yexception() << "Uninitialized result: " << GetIssues().ToString();
+ }
+ return *Result;
+ }
+
+ bool HasResult() const {
+ return Result;
+ }
+
+private:
+ std::unique_ptr<TProtoResult> Result;
+};
+
+using TCreateQueryResult = TProtoResultWrapper<FederatedQuery::CreateQueryResult>;
+using TAsyncCreateQueryResult = NThreading::TFuture<TCreateQueryResult>;
+struct TCreateQuerySettings : public NYdb::TOperationRequestSettings<TCreateQuerySettings> {};
+
+using TListQueriesResult = TProtoResultWrapper<FederatedQuery::ListQueriesResult>;
+using TAsyncListQueriesResult = NThreading::TFuture<TListQueriesResult>;
+struct TListQueriesSettings : public NYdb::TOperationRequestSettings<TListQueriesSettings> {};
+
+using TDescribeQueryResult = TProtoResultWrapper<FederatedQuery::DescribeQueryResult>;
+using TAsyncDescribeQueryResult = NThreading::TFuture<TDescribeQueryResult>;
+struct TDescribeQuerySettings : public NYdb::TOperationRequestSettings<TDescribeQuerySettings> {};
+
+using TGetQueryStatusResult = TProtoResultWrapper<FederatedQuery::GetQueryStatusResult>;
+using TAsyncGetQueryStatusResult = NThreading::TFuture<TGetQueryStatusResult>;
+struct TGetQueryStatusSettings : public NYdb::TOperationRequestSettings<TGetQueryStatusSettings> {};
+
+using TModifyQueryResult = TProtoResultWrapper<FederatedQuery::ModifyQueryResult>;
+using TAsyncModifyQueryResult = NThreading::TFuture<TModifyQueryResult>;
+struct TModifyQuerySettings : public NYdb::TOperationRequestSettings<TModifyQuerySettings> {};
+
+using TDeleteQueryResult = TProtoResultWrapper<FederatedQuery::DeleteQueryResult>;
+using TAsyncDeleteQueryResult = NThreading::TFuture<TDeleteQueryResult>;
+struct TDeleteQuerySettings : public NYdb::TOperationRequestSettings<TDeleteQuerySettings> {};
+
+using TControlQueryResult = TProtoResultWrapper<FederatedQuery::ControlQueryResult>;
+using TAsyncControlQueryResult = NThreading::TFuture<TControlQueryResult>;
+struct TControlQuerySettings : public NYdb::TOperationRequestSettings<TControlQuerySettings> {};
+
+using TGetResultDataResult = TProtoResultWrapper<FederatedQuery::GetResultDataResult>;
+using TAsyncGetResultDataResult = NThreading::TFuture<TGetResultDataResult>;
+struct TGetResultDataSettings : public NYdb::TOperationRequestSettings<TGetResultDataSettings> {};
+
+using TListJobsResult = TProtoResultWrapper<FederatedQuery::ListJobsResult>;
+using TAsyncListJobsResult = NThreading::TFuture<TListJobsResult>;
+struct TListJobsSettings : public NYdb::TOperationRequestSettings<TListJobsSettings> {};
+
+using TDescribeJobResult = TProtoResultWrapper<FederatedQuery::DescribeJobResult>;
+using TAsyncDescribeJobResult = NThreading::TFuture<TDescribeJobResult>;
+struct TDescribeJobSettings : public NYdb::TOperationRequestSettings<TDescribeJobSettings> {};
+
+using TCreateConnectionResult = TProtoResultWrapper<FederatedQuery::CreateConnectionResult>;
+using TAsyncCreateConnectionResult = NThreading::TFuture<TCreateConnectionResult>;
+struct TCreateConnectionSettings : public NYdb::TOperationRequestSettings<TCreateConnectionSettings> {};
+
+using TListConnectionsResult = TProtoResultWrapper<FederatedQuery::ListConnectionsResult>;
+using TAsyncListConnectionsResult = NThreading::TFuture<TListConnectionsResult>;
+struct TListConnectionsSettings : public NYdb::TOperationRequestSettings<TListConnectionsSettings> {};
+
+using TDescribeConnectionResult = TProtoResultWrapper<FederatedQuery::DescribeConnectionResult>;
+using TAsyncDescribeConnectionResult = NThreading::TFuture<TDescribeConnectionResult>;
+struct TDescribeConnectionSettings : public NYdb::TOperationRequestSettings<TDescribeConnectionSettings> {};
+
+using TModifyConnectionResult = TProtoResultWrapper<FederatedQuery::ModifyConnectionResult>;
+using TAsyncModifyConnectionResult = NThreading::TFuture<TModifyConnectionResult>;
+struct TModifyConnectionSettings : public NYdb::TOperationRequestSettings<TModifyConnectionSettings> {};
+
+using TDeleteConnectionResult = TProtoResultWrapper<FederatedQuery::DeleteConnectionResult>;
+using TAsyncDeleteConnectionResult = NThreading::TFuture<TDeleteConnectionResult>;
+struct TDeleteConnectionSettings : public NYdb::TOperationRequestSettings<TDeleteConnectionSettings> {};
+
+using TTestConnectionResult = TProtoResultWrapper<FederatedQuery::TestConnectionResult>;
+using TAsyncTestConnectionResult = NThreading::TFuture<TTestConnectionResult>;
+struct TTestConnectionSettings : public NYdb::TOperationRequestSettings<TTestConnectionSettings> {};
+
+using TCreateBindingResult = TProtoResultWrapper<FederatedQuery::CreateBindingResult>;
+using TAsyncCreateBindingResult = NThreading::TFuture<TCreateBindingResult>;
+struct TCreateBindingSettings : public NYdb::TOperationRequestSettings<TCreateBindingSettings> {};
+
+using TListBindingsResult = TProtoResultWrapper<FederatedQuery::ListBindingsResult>;
+using TAsyncListBindingsResult = NThreading::TFuture<TListBindingsResult>;
+struct TListBindingsSettings : public NYdb::TOperationRequestSettings<TListBindingsSettings> {};
+
+using TDescribeBindingResult = TProtoResultWrapper<FederatedQuery::DescribeBindingResult>;
+using TAsyncDescribeBindingResult = NThreading::TFuture<TDescribeBindingResult>;
+struct TDescribeBindingSettings : public NYdb::TOperationRequestSettings<TDescribeBindingSettings> {};
+
+using TModifyBindingResult = TProtoResultWrapper<FederatedQuery::ModifyBindingResult>;
+using TAsyncModifyBindingResult = NThreading::TFuture<TModifyBindingResult>;
+struct TModifyBindingSettings : public NYdb::TOperationRequestSettings<TModifyBindingSettings> {};
+
+using TDeleteBindingResult = TProtoResultWrapper<FederatedQuery::DeleteBindingResult>;
+using TAsyncDeleteBindingResult = NThreading::TFuture<TDeleteBindingResult>;
+struct TDeleteBindingSettings : public NYdb::TOperationRequestSettings<TDeleteBindingSettings> {};
+
+class TClient {
+ class TImpl;
+
+public:
+ TClient(const NYdb::TDriver& driver, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings());
+
+ TAsyncCreateQueryResult CreateQuery(
+ const FederatedQuery::CreateQueryRequest& request,
+ const TCreateQuerySettings& settings = TCreateQuerySettings());
+
+ TAsyncListQueriesResult ListQueries(
+ const FederatedQuery::ListQueriesRequest& request,
+ const TListQueriesSettings& settings = TListQueriesSettings());
+
+ TAsyncDescribeQueryResult DescribeQuery(
+ const FederatedQuery::DescribeQueryRequest& request,
+ const TDescribeQuerySettings& settings = TDescribeQuerySettings());
+
+ TAsyncGetQueryStatusResult GetQueryStatus(
+ const FederatedQuery::GetQueryStatusRequest& request,
+ const TGetQueryStatusSettings& settings = TGetQueryStatusSettings());
+
+ TAsyncModifyQueryResult ModifyQuery(
+ const FederatedQuery::ModifyQueryRequest& request,
+ const TModifyQuerySettings& settings = TModifyQuerySettings());
+
+ TAsyncDeleteQueryResult DeleteQuery(
+ const FederatedQuery::DeleteQueryRequest& request,
+ const TDeleteQuerySettings& settings = TDeleteQuerySettings());
+
+ TAsyncControlQueryResult ControlQuery(
+ const FederatedQuery::ControlQueryRequest& request,
+ const TControlQuerySettings& settings = TControlQuerySettings());
+
+ TAsyncGetResultDataResult GetResultData(
+ const FederatedQuery::GetResultDataRequest& request,
+ const TGetResultDataSettings& settings = TGetResultDataSettings());
+
+ TAsyncListJobsResult ListJobs(
+ const FederatedQuery::ListJobsRequest& request,
+ const TListJobsSettings& settings = TListJobsSettings());
+
+ TAsyncDescribeJobResult DescribeJob(
+ const FederatedQuery::DescribeJobRequest& request,
+ const TDescribeJobSettings& settings = TDescribeJobSettings());
+
+ TAsyncCreateConnectionResult CreateConnection(
+ const FederatedQuery::CreateConnectionRequest& request,
+ const TCreateConnectionSettings& settings = TCreateConnectionSettings());
+
+ TAsyncListConnectionsResult ListConnections(
+ const FederatedQuery::ListConnectionsRequest& request,
+ const TListConnectionsSettings& settings = TListConnectionsSettings());
+
+ TAsyncDescribeConnectionResult DescribeConnection(
+ const FederatedQuery::DescribeConnectionRequest& request,
+ const TDescribeConnectionSettings& settings = TDescribeConnectionSettings());
+
+ TAsyncModifyConnectionResult ModifyConnection(
+ const FederatedQuery::ModifyConnectionRequest& request,
+ const TModifyConnectionSettings& settings = TModifyConnectionSettings());
+
+ TAsyncDeleteConnectionResult DeleteConnection(
+ const FederatedQuery::DeleteConnectionRequest& request,
+ const TDeleteConnectionSettings& settings = TDeleteConnectionSettings());
+
+ TAsyncTestConnectionResult TestConnection(
+ const FederatedQuery::TestConnectionRequest& request,
+ const TTestConnectionSettings& settings = TTestConnectionSettings());
+
+ TAsyncCreateBindingResult CreateBinding(
+ const FederatedQuery::CreateBindingRequest& request,
+ const TCreateBindingSettings& settings = TCreateBindingSettings());
+
+ TAsyncListBindingsResult ListBindings(
+ const FederatedQuery::ListBindingsRequest& request,
+ const TListBindingsSettings& settings = TListBindingsSettings());
+
+ TAsyncDescribeBindingResult DescribeBinding(
+ const FederatedQuery::DescribeBindingRequest& request,
+ const TDescribeBindingSettings& settings = TDescribeBindingSettings());
+
+ TAsyncModifyBindingResult ModifyBinding(
+ const FederatedQuery::ModifyBindingRequest& request,
+ const TModifyBindingSettings& settings = TModifyBindingSettings());
+
+ TAsyncDeleteBindingResult DeleteBinding(
+ const FederatedQuery::DeleteBindingRequest& request,
+ const TDeleteBindingSettings& settings = TDeleteBindingSettings());
+
+private:
+ std::shared_ptr<TImpl> Impl_;
+};
+
+} // namespace NYdb::NFq
diff --git a/ydb/public/lib/fq/helpers.h b/ydb/public/lib/fq/helpers.h
new file mode 100644
index 00000000000..7d84a2c1b33
--- /dev/null
+++ b/ydb/public/lib/fq/helpers.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include "scope.h"
+
+namespace NYdb {
+namespace NFq {
+
+template<typename T>
+T CreateFqSettings(const TScope& scope)
+{
+ T settings;
+ settings.Header_ = {
+ { "x-ydb-fq-project", scope.ToString() }
+ };
+ return settings;
+}
+
+template<typename T>
+T CreateFqSettings(const TString& folderId)
+{
+ return CreateFqSettings<T>(TScope{TScope::YandexCloudScopeSchema + "://" + folderId});
+}
+
+} // namespace NFq
+} // namespace Ndb
diff --git a/ydb/public/lib/fq/scope.cpp b/ydb/public/lib/fq/scope.cpp
new file mode 100644
index 00000000000..fac8789e5f5
--- /dev/null
+++ b/ydb/public/lib/fq/scope.cpp
@@ -0,0 +1,9 @@
+#include "scope.h"
+
+namespace NYdb {
+namespace NFq {
+
+TString TScope::YandexCloudScopeSchema = "yandexcloud";
+
+} // namespace NFq
+} // namespace Ndb
diff --git a/ydb/public/lib/fq/scope.h b/ydb/public/lib/fq/scope.h
new file mode 100644
index 00000000000..4d54ea0a8e4
--- /dev/null
+++ b/ydb/public/lib/fq/scope.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <util/string/builder.h>
+
+namespace NYdb {
+namespace NFq {
+
+class TScope {
+public:
+ static TString YandexCloudScopeSchema;
+
+ TScope()
+ { }
+
+ TScope(const TString& scope)
+ : Scope(scope)
+ { }
+
+ TScope(TString&& scope)
+ : Scope(std::move(scope))
+ { }
+
+ const TString& ToString() const {
+ return Scope;
+ }
+
+ bool Empty() const {
+ return Scope.empty();
+ }
+
+private:
+ TString Scope;
+};
+
+} // namespace NFq
+} // namespace Ndb
diff --git a/ydb/services/fq/CMakeLists.txt b/ydb/services/fq/CMakeLists.txt
new file mode 100644
index 00000000000..75abc50a008
--- /dev/null
+++ b/ydb/services/fq/CMakeLists.txt
@@ -0,0 +1,24 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(ydb-services-fq)
+target_link_libraries(ydb-services-fq PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-grpc-server
+ library-cpp-retry
+ ydb-core-grpc_services
+ core-grpc_services-base
+ ydb-library-protobuf_printer
+ api-grpc
+)
+target_sources(ydb-services-fq PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/grpc_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/private_grpc.cpp
+)
diff --git a/ydb/services/fq/grpc_service.cpp b/ydb/services/fq/grpc_service.cpp
new file mode 100644
index 00000000000..867714214dd
--- /dev/null
+++ b/ydb/services/fq/grpc_service.cpp
@@ -0,0 +1,278 @@
+#include "grpc_service.h"
+
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/grpc_request_proxy.h>
+#include <ydb/core/grpc_services/rpc_calls.h>
+#include <ydb/core/grpc_services/service_fq.h>
+#include <ydb/library/protobuf_printer/security_printer.h>
+
+namespace NKikimr::NGRpcService {
+
+TGRpcFederatedQueryService::TGRpcFederatedQueryService(NActors::TActorSystem *system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
+ : ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(id) {}
+
+void TGRpcFederatedQueryService::InitService(grpc::ServerCompletionQueue *cq, NGrpc::TLoggerPtr logger) {
+ CQ_ = cq;
+ SetupIncomingRequests(std::move(logger));
+}
+
+void TGRpcFederatedQueryService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter_ = limiter;
+}
+
+bool TGRpcFederatedQueryService::IncRequest() {
+ return Limiter_->Inc();
+}
+
+void TGRpcFederatedQueryService::DecRequest() {
+ Limiter_->Dec();
+ Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
+}
+
+void TGRpcFederatedQueryService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+
+ using NPerms = NKikimr::TEvTicketParser::TEvAuthorizeTicket;
+
+ static const std::function CreateQueryPermissions{[](const FederatedQuery::CreateQueryRequest& request) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.queries.create"),
+ NPerms::Optional("yq.connections.use"),
+ NPerms::Optional("yq.bindings.use")
+ };
+ if (request.execute_mode() != FederatedQuery::SAVE) {
+ permissions.push_back(NPerms::Required("yq.queries.invoke"));
+ }
+ if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) {
+ permissions.push_back(NPerms::Required("yq.resources.managePublic"));
+ }
+ return permissions;
+ }};
+
+ static const std::function ListQueriesPermissions{[](const FederatedQuery::ListQueriesRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function DescribeQueryPermissions{[](const FederatedQuery::DescribeQueryRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.get"),
+ NPerms::Optional("yq.queries.viewAst"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function GetQueryStatusPermissions{[](const FederatedQuery::GetQueryStatusRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.getStatus"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function ModifyQueryPermissions{[](const FederatedQuery::ModifyQueryRequest& request) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.queries.update"),
+ NPerms::Optional("yq.connections.use"),
+ NPerms::Optional("yq.bindings.use"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ if (request.execute_mode() != FederatedQuery::SAVE) {
+ permissions.push_back(NPerms::Required("yq.queries.invoke"));
+ }
+ if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) {
+ permissions.push_back(NPerms::Required("yq.resources.managePublic"));
+ }
+ return permissions;
+ }};
+
+ static const std::function DeleteQueryPermissions{[](const FederatedQuery::DeleteQueryRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.delete"),
+ NPerms::Optional("yq.resources.managePublic"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ }};
+
+ static const std::function ControlQueryPermissions{[](const FederatedQuery::ControlQueryRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.control"),
+ NPerms::Optional("yq.resources.managePublic"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ }};
+
+ static const std::function GetResultDataPermissions{[](const FederatedQuery::GetResultDataRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.queries.getData"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function ListJobsPermissions{[](const FederatedQuery::ListJobsRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.jobs.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function DescribeJobPermissions{[](const FederatedQuery::DescribeJobRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.jobs.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function CreateConnectionPermissions{[](const FederatedQuery::CreateConnectionRequest& request) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.connections.create"),
+ };
+ if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) {
+ permissions.push_back(NPerms::Required("yq.resources.managePublic"));
+ }
+ return permissions;
+ }};
+
+ static const std::function ListConnectionsPermissions{[](const FederatedQuery::ListConnectionsRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.connections.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function DescribeConnectionPermissions{[](const FederatedQuery::DescribeConnectionRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.connections.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function ModifyConnectionPermissions{[](const FederatedQuery::ModifyConnectionRequest& request) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.connections.update"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ if (request.content().acl().visibility() == FederatedQuery::Acl::SCOPE) {
+ permissions.push_back(NPerms::Required("yq.resources.managePublic"));
+ }
+ return permissions;
+ }};
+
+ static const std::function DeleteConnectionPermissions{[](const FederatedQuery::DeleteConnectionRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.connections.delete"),
+ NPerms::Optional("yq.resources.managePublic"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ }};
+
+ static const std::function TestConnectionPermissions{[](const FederatedQuery::TestConnectionRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.connections.create")
+ };
+ }};
+
+ static const std::function CreateBindingPermissions{[](const FederatedQuery::CreateBindingRequest&) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.bindings.create"),
+ };
+ // For use in binding links on connection with visibility SCOPE,
+ // the yq.resources.managePublic permission is required. But there
+ // is no information about connection visibility in this place,
+ // so yq.resources.managePublic is always requested as optional
+ permissions.push_back(NPerms::Optional("yq.resources.managePublic"));
+ return permissions;
+ }};
+
+ static const std::function ListBindingsPermissions{[](const FederatedQuery::ListBindingsRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.bindings.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function DescribeBindingPermissions{[](const FederatedQuery::DescribeBindingRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.bindings.get"),
+ NPerms::Optional("yq.resources.viewPublic"),
+ NPerms::Optional("yq.resources.viewPrivate")
+ };
+ }};
+
+ static const std::function ModifyBindingPermissions{[](const FederatedQuery::ModifyBindingRequest&) {
+ TVector<NPerms::TPermission> permissions{
+ NPerms::Required("yq.bindings.update"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ // For use in binding links on connection with visibility SCOPE,
+ // the yq.resources.managePublic permission is required. But there
+ // is no information about connection visibility in this place,
+ // so yq.resources.managePublic is always requested as optional
+ permissions.push_back(NPerms::Optional("yq.resources.managePublic"));
+ return permissions;
+ }};
+
+ static const std::function DeleteBindingPermissions{[](const FederatedQuery::DeleteBindingRequest&) -> TVector<NPerms::TPermission> {
+ return {
+ NPerms::Required("yq.bindings.delete"),
+ NPerms::Optional("yq.resources.managePublic"),
+ NPerms::Optional("yq.resources.managePrivate")
+ };
+ }};
+
+#ifdef ADD_REQUEST
+#error ADD_REQUEST macro already defined
+#endif
+#define ADD_REQUEST(NAME, CB, PERMISSIONS) \
+MakeIntrusive<TGRpcRequest<FederatedQuery::NAME##Request, FederatedQuery::NAME##Response, TGRpcFederatedQueryService, TSecurityTextFormatPrinter<FederatedQuery::NAME##Request>, TSecurityTextFormatPrinter<FederatedQuery::NAME##Response>>>( \
+ this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcFqRequestOperationCall<FederatedQuery::NAME##Request, FederatedQuery::NAME##Response> \
+ (ctx, &CB, PERMISSIONS)); \
+ }, \
+ &FederatedQuery::V1::FederatedQueryService::AsyncService::Request##NAME, \
+ #NAME, logger, getCounterBlock("fq", #NAME)) \
+ ->Run(); \
+
+ ADD_REQUEST(CreateQuery, DoFederatedQueryCreateQueryRequest, CreateQueryPermissions)
+ ADD_REQUEST(ListQueries, DoFederatedQueryListQueriesRequest, ListQueriesPermissions)
+ ADD_REQUEST(DescribeQuery, DoFederatedQueryDescribeQueryRequest, DescribeQueryPermissions)
+ ADD_REQUEST(GetQueryStatus, DoFederatedQueryGetQueryStatusRequest, GetQueryStatusPermissions)
+ ADD_REQUEST(ModifyQuery, DoFederatedQueryModifyQueryRequest, ModifyQueryPermissions)
+ ADD_REQUEST(DeleteQuery, DoFederatedQueryDeleteQueryRequest, DeleteQueryPermissions)
+ ADD_REQUEST(ControlQuery, DoFederatedQueryControlQueryRequest, ControlQueryPermissions)
+ ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions)
+ ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions)
+ ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions)
+ ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions)
+ ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions)
+ ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions)
+ ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions)
+ ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions)
+ ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions)
+ ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions)
+ ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions)
+ ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions)
+ ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions)
+ ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions)
+
+#undef ADD_REQUEST
+
+}
+
+} // namespace NKikimr::NGRpcService
diff --git a/ydb/services/fq/grpc_service.h b/ydb/services/fq/grpc_service.h
new file mode 100644
index 00000000000..4bc79d0e8a7
--- /dev/null
+++ b/ydb/services/fq/grpc_service.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+
+#include <library/cpp/grpc/server/grpc_server.h>
+
+#include <ydb/public/api/grpc/fq_v1.grpc.pb.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class TGRpcFederatedQueryService
+ : public NGrpc::TGrpcServiceBase<FederatedQuery::V1::FederatedQueryService>
+{
+public:
+ TGRpcFederatedQueryService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+
+ bool IncRequest();
+ void DecRequest();
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+ NActors::TActorSystem* ActorSystem_;
+ grpc::ServerCompletionQueue* CQ_ = nullptr;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
+ NActors::TActorId GRpcRequestProxyId_;
+ NGrpc::TGlobalLimiter* Limiter_ = nullptr;
+};
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/fq/private_grpc.cpp b/ydb/services/fq/private_grpc.cpp
new file mode 100644
index 00000000000..d7ce6a320c1
--- /dev/null
+++ b/ydb/services/fq/private_grpc.cpp
@@ -0,0 +1,66 @@
+#include "private_grpc.h"
+
+#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/grpc_helper.h>
+#include <ydb/core/grpc_services/service_fq_internal.h>
+#include <ydb/library/protobuf_printer/security_printer.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+TGRpcFqPrivateTaskService::TGRpcFqPrivateTaskService(NActors::TActorSystem *system,
+ TIntrusivePtr<NMonitoring::TDynamicCounters> counters, NActors::TActorId id)
+ : ActorSystem_(system)
+ , Counters_(counters)
+ , GRpcRequestProxyId_(id) {}
+
+void TGRpcFqPrivateTaskService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) {
+ CQ_ = cq;
+ SetupIncomingRequests(std::move(logger));
+}
+
+void TGRpcFqPrivateTaskService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) {
+ Limiter_ = limiter;
+}
+
+bool TGRpcFqPrivateTaskService::IncRequest() {
+ return Limiter_->Inc();
+}
+
+void TGRpcFqPrivateTaskService::DecRequest() {
+ Limiter_->Dec();
+ Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0);
+}
+
+void TGRpcFqPrivateTaskService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) {
+ auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_);
+
+#ifdef ADD_REQUEST
+#error ADD_REQUEST macro already defined
+#endif
+#define ADD_REQUEST(NAME, CB) \
+MakeIntrusive<TGRpcRequest<Fq::Private::NAME##Request, Fq::Private::NAME##Response, TGRpcFqPrivateTaskService, TSecurityTextFormatPrinter<Fq::Private::NAME##Request>, TSecurityTextFormatPrinter<Fq::Private::NAME##Response>>>( \
+ this, &Service_, CQ_, \
+ [this](NGrpc::IRequestContextBase *ctx) { \
+ NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \
+ ActorSystem_->Send(GRpcRequestProxyId_, \
+ new TGrpcRequestOperationCall<Fq::Private::NAME##Request, Fq::Private::NAME##Response> \
+ (ctx, &CB)); \
+ }, \
+ &Fq::Private::V1::FqPrivateTaskService::AsyncService::Request##NAME, \
+ #NAME, logger, getCounterBlock("fq_internal", #NAME)) \
+ ->Run(); \
+
+ ADD_REQUEST(PingTask, DoFqPrivatePingTaskRequest)
+
+ ADD_REQUEST(GetTask, DoFqPrivateGetTaskRequest)
+
+ ADD_REQUEST(WriteTaskResult, DoFqPrivateWriteTaskResultRequest)
+
+ ADD_REQUEST(NodesHealthCheck, DoFqPrivateNodesHealthCheckRequest)
+
+#undef ADD_REQUEST
+}
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/fq/private_grpc.h b/ydb/services/fq/private_grpc.h
new file mode 100644
index 00000000000..ddd5252a0e2
--- /dev/null
+++ b/ydb/services/fq/private_grpc.h
@@ -0,0 +1,34 @@
+#pragma once
+
+#include <library/cpp/actors/core/actorsystem.h>
+#include <library/cpp/grpc/server/grpc_server.h>
+#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h>
+
+namespace NKikimr {
+namespace NGRpcService {
+
+class TGRpcFqPrivateTaskService
+ : public NGrpc::TGrpcServiceBase<Fq::Private::V1::FqPrivateTaskService>
+{
+public:
+ TGRpcFqPrivateTaskService(NActors::TActorSystem* system, TIntrusivePtr<NMonitoring::TDynamicCounters> counters,
+ NActors::TActorId id);
+
+ void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override;
+ void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override;
+
+ bool IncRequest();
+ void DecRequest();
+private:
+ void SetupIncomingRequests(NGrpc::TLoggerPtr logger);
+
+ NActors::TActorSystem* ActorSystem_;
+ grpc::ServerCompletionQueue* CQ_ = nullptr;
+
+ TIntrusivePtr<NMonitoring::TDynamicCounters> Counters_;
+ NActors::TActorId GRpcRequestProxyId_;
+ NGrpc::TGlobalLimiter* Limiter_ = nullptr;
+};
+
+} // namespace NGRpcService
+} // namespace NKikimr
diff --git a/ydb/services/fq/ut_integration/CMakeLists.darwin.txt b/ydb/services/fq/ut_integration/CMakeLists.darwin.txt
new file mode 100644
index 00000000000..d8f6f2a9913
--- /dev/null
+++ b/ydb/services/fq/ut_integration/CMakeLists.darwin.txt
@@ -0,0 +1,61 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-fq-ut_integration)
+target_compile_options(ydb-services-fq-ut_integration PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-fq-ut_integration PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq
+)
+target_link_libraries(ydb-services-fq-ut_integration PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-fq
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ yq-libs-control_plane_storage
+ yq-libs-db_schema
+ yq-libs-private_client
+ providers-common-db_id_async_resolver
+ yql-sql-pg_dummy
+ common-clickhouse-client
+ library-yql-utils
+ public-lib-fq
+ ydb-services-ydb
+)
+target_link_options(ydb-services-fq-ut_integration PRIVATE
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+)
+target_sources(ydb-services-fq-ut_integration PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/ut_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/fq_ut.cpp
+)
+add_test(
+ NAME
+ ydb-services-fq-ut_integration
+ COMMAND
+ ydb-services-fq-ut_integration
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-services-fq-ut_integration)
diff --git a/ydb/services/fq/ut_integration/CMakeLists.linux.txt b/ydb/services/fq/ut_integration/CMakeLists.linux.txt
new file mode 100644
index 00000000000..a08124095b2
--- /dev/null
+++ b/ydb/services/fq/ut_integration/CMakeLists.linux.txt
@@ -0,0 +1,65 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(ydb-services-fq-ut_integration)
+target_compile_options(ydb-services-fq-ut_integration PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(ydb-services-fq-ut_integration PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq
+)
+target_link_libraries(ydb-services-fq-ut_integration PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ ydb-services-fq
+ library-cpp-getopt
+ cpp-grpc-client
+ cpp-regex-pcre
+ library-cpp-svnversion
+ ydb-core-testlib
+ yq-libs-control_plane_storage
+ yq-libs-db_schema
+ yq-libs-private_client
+ providers-common-db_id_async_resolver
+ yql-sql-pg_dummy
+ clickhouse_client_udf
+ library-yql-utils
+ public-lib-fq
+ ydb-services-ydb
+)
+target_link_options(ydb-services-fq-ut_integration PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(ydb-services-fq-ut_integration PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/ut_utils.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/services/fq/ut_integration/fq_ut.cpp
+)
+add_test(
+ NAME
+ ydb-services-fq-ut_integration
+ COMMAND
+ ydb-services-fq-ut_integration
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+vcs_info(ydb-services-fq-ut_integration)
diff --git a/ydb/services/fq/ut_integration/CMakeLists.txt b/ydb/services/fq/ut_integration/CMakeLists.txt
new file mode 100644
index 00000000000..fc7b1ee73ce
--- /dev/null
+++ b/ydb/services/fq/ut_integration/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX AND NOT APPLE)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp
new file mode 100644
index 00000000000..b7024005d90
--- /dev/null
+++ b/ydb/services/fq/ut_integration/fq_ut.cpp
@@ -0,0 +1,998 @@
+#include <ydb/services/ydb/ydb_common_ut.h>
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+
+#include <ydb/public/lib/fq/fq.h>
+#include <ydb/public/lib/fq/helpers.h>
+#include <ydb/core/yq/libs/db_schema/db_schema.h>
+#include <ydb/core/yq/libs/mock/yql_mock.h>
+#include <ydb/core/yq/libs/private_client/private_client_fq.h>
+
+#include <ydb/core/yq/libs/control_plane_storage/message_builders.h>
+#include <ydb/core/yq/libs/actors/database_resolver.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <library/cpp/protobuf/util/pb_io.h>
+#include <library/cpp/retry/retry.h>
+#include <util/system/mutex.h>
+#include "ut_utils.h"
+#include <google/protobuf/util/time_util.h>
+
+#include <ydb/public/lib/fq/scope.h>
+#include <util/system/hostname.h>
+
+#include <util/string/split.h>
+
+using namespace NYdb;
+using namespace FederatedQuery;
+using namespace NYdb::NFq;
+
+namespace {
+ const ui32 Retries = 10;
+
+ void PrintProtoIssues(const NProtoBuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>& protoIssues) {
+ if (protoIssues.empty()) {
+ Cerr << "No Issues" << Endl;
+ return;
+ }
+ NYql::TIssues issues;
+ NYql::IssuesFromMessage(protoIssues, issues);
+ Cerr << ">>> Issues: " << issues.ToString() << Endl;
+ }
+
+ TString CreateNewHistoryAndWaitFinish(const TString& folderId,
+ NYdb::NFq::TClient& client, const TString& yqlText,
+ const FederatedQuery::QueryMeta::ComputeStatus& expectedStatusResult)
+ {
+ //CreateQuery
+ TString queryId;
+ {
+ auto request = ::NFq::TCreateQueryBuilder{}
+ .SetText(yqlText)
+ .Build();
+ auto result = client.CreateQuery(
+ request, CreateFqSettings<TCreateQuerySettings>(folderId))
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ queryId = result.GetResult().query_id();
+ }
+ // GetQueryStatus
+ const auto request = ::NFq::TGetQueryStatusBuilder{}
+ .SetQueryId(queryId)
+ .Build();
+ const auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.GetQueryStatus(
+ request, CreateFqSettings<TGetQueryStatusSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ return result.GetResult().status() == expectedStatusResult;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+
+ return queryId;
+ }
+
+ void CheckGetResultData(
+ NYdb::NFq::TClient& client,
+ const TString& queryId,
+ const TString& folderId,
+ const ui64 rowsCount,
+ const int colsSize,
+ const int answer)
+ {
+ const auto request = ::NFq::TGetResultDataBuilder{}
+ .SetQueryId(queryId)
+ .Build();
+ const auto result = client.GetResultData(
+ request, CreateFqSettings<TGetResultDataSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto& resultSet = result.GetResult().result_set();
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), rowsCount);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), colsSize);
+ if (!resultSet.rows().empty()) {
+ const auto& item = resultSet.rows(0).items(0);
+ TString str = item.DebugString();
+ TVector<TString> arr;
+ StringSplitter(str).Split(' ').SkipEmpty().AddTo(&arr);
+ Y_VERIFY(arr.size() == 2, "Incorrect numeric result");
+ UNIT_ASSERT_VALUES_EQUAL(std::stoi(arr.back()), answer);
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(Yq_1) {
+ Y_UNIT_TEST(Basic) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = "some_folder_id";
+ const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+
+ {
+ const auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId("foo")
+ .Build();
+ const auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>("WTF"))
+ .ExtractValueSync();
+ return result.GetStatus() == EStatus::BAD_REQUEST;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+
+ {
+ auto request = ::NFq::TListQueriesBuilder{}.Build();
+ auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.ListQueries(
+ request, CreateFqSettings<TListQueriesSettings>("WTF"))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().query().size(), 0);
+ return result.GetStatus() == EStatus::SUCCESS;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+
+ {
+ const auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto status = result.GetResult().query().meta().status();
+ UNIT_ASSERT(status == FederatedQuery::QueryMeta::COMPLETED);
+ }
+
+ {
+ const auto request = ::NFq::TModifyQueryBuilder()
+ .SetQueryId(queryId)
+ .SetName("MODIFIED_NAME")
+ .Build();
+ const auto result = client.ModifyQuery(
+ request, CreateFqSettings<TModifyQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ const auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto& res = result.GetResult();
+ UNIT_ASSERT_VALUES_EQUAL(res.query().content().name(), "MODIFIED_NAME");
+ UNIT_ASSERT(res.query().content().acl().visibility() == static_cast<int>(Acl_Visibility_SCOPE));
+ }
+
+ {
+ const auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId("")
+ .Build();
+ const auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>(""))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+
+ {
+ const auto request = ::NFq::TGetResultDataBuilder()
+ .SetQueryId("")
+ .Build();
+ const auto result = client.GetResultData(
+ request, CreateFqSettings<TGetResultDataSettings>(""))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(Basic_EmptyTable) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ UpsertToExistingTable(driver, location);
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("testdbempty")
+ .CreateYdb("Root", location, "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ const TString queryId = CreateNewHistoryAndWaitFinish(
+ folderId, client,
+ "select count(*) from testdbempty.`yq/empty_table`",
+ FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 0);
+ }
+
+ Y_UNIT_TEST(Basic_EmptyList) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+ auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED;
+ CreateNewHistoryAndWaitFinish(folderId, client, "select []", expectedStatus);
+ }
+
+ Y_UNIT_TEST(Basic_EmptyDict) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+ auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED;
+ CreateNewHistoryAndWaitFinish(folderId, client, "select {}", expectedStatus);
+ }
+
+ Y_UNIT_TEST(Basic_Null) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+ auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED;
+ CreateNewHistoryAndWaitFinish(folderId, client, "select null", expectedStatus);
+ }
+
+ SIMPLE_UNIT_FORKED_TEST(Basic_Tagged) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+
+
+ {
+ auto request = ::NFq::TCreateConnectionBuilder{}
+ .SetName("testdb00")
+ .CreateYdb("Root", location, "")
+ .Build();
+
+ auto result = client.CreateConnection(
+ request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED;
+ CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(count(*), \"tag\") from testdb00.`yq/connections`", expectedStatus);
+ }
+
+ Y_UNIT_TEST(Basic_TaggedLiteral) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+
+ auto expectedStatus = FederatedQuery::QueryMeta::COMPLETED;
+ CreateNewHistoryAndWaitFinish(folderId, client, "select AsTagged(1, \"tag\")", expectedStatus);
+ }
+
+ // use fork for data test due to ch initialization problem
+ SIMPLE_UNIT_FORKED_TEST(ExtendedDatabaseId) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "folder_id_" + CreateGuidAsString();
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("testdb01")
+ .CreateYdb("FakeDatabaseId", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("testdb02")
+ .CreateYdb("FakeDatabaseId", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client,
+ "select count(*) from testdb01.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 2);
+ }
+
+ {
+ // test connections db with 2 databaseId
+ const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client,
+ "select count(*) from testdb02.`yq/connections`", FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 2);
+ }
+ }
+
+ Y_UNIT_TEST(DescribeConnection) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const TString folderId = "some_folder_id";
+ TString conId;
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("created_conn")
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ conId = result.GetResult().connection_id();
+ }
+ {
+ auto request = ::NFq::TDescribeConnectionBuilder()
+ .SetConnectionId(conId)
+ .Build();
+ auto result = client
+ .DescribeConnection(request, CreateFqSettings<TDescribeConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ const auto& res = result.GetResult().connection();
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().id(), conId);
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().created_by(), "root@builtin");
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().modified_by(), "root@builtin");
+ UNIT_ASSERT_VALUES_EQUAL(res.content().name(), "created_conn");
+ }
+ }
+
+ Y_UNIT_TEST(ListConnections) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const size_t conns = 3;
+ const auto folderId = TString(__func__) + "folder_id";
+ {//CreateConnections
+ for (size_t i = 0; i < conns - 1; ++i) {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("testdb" + ToString(i))
+ .CreateYdb("FakeDatabaseId", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ // yds
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("testdb2")
+ .CreateDataStreams("FakeDatabaseId", "") // We can use the same db in yds and ydb
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto request = ::NFq::TListConnectionsBuilder().Build();
+ auto result = client
+ .ListConnections(request, CreateFqSettings<TListConnectionsSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().connection().size(), conns);
+ size_t i = 0;
+ auto res = result.GetResult();
+ auto* conns = res.mutable_connection();
+ std::sort(conns->begin(), conns->end(), [&](const auto& lhs, const auto& rhs) {
+ return lhs.content().name() < rhs.content().name();
+ });
+ for (const auto& conn : *conns) {
+ const auto& content = conn.content();
+ const auto& meta = conn.meta();
+ UNIT_ASSERT_VALUES_EQUAL(content.name(), "testdb" + ToString(i));
+ UNIT_ASSERT_VALUES_EQUAL(meta.created_by(), "root@builtin");
+ UNIT_ASSERT_VALUES_EQUAL(meta.modified_by(), "root@builtin");
+ if (i < 2) {
+ UNIT_ASSERT_C(content.setting().has_ydb_database(), content);
+ } else {
+ UNIT_ASSERT_C(content.setting().has_data_streams(), content);
+ }
+ i++;
+ }
+ }
+ }
+
+ Y_UNIT_TEST(ListConnectionsOnEmptyConnectionsTable) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+
+ {
+ const auto request = ::NFq::TListConnectionsBuilder().Build();
+ auto result = client
+ .ListConnections(request, CreateFqSettings<TListConnectionsSettings>("WTF"))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT(result.GetResult().connection().empty());
+ }
+ }
+
+ Y_UNIT_TEST(ModifyConnections) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ TString userId = "root";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = TString(__func__) + "folder_id";
+ TString conId;
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("created_conn")
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ conId = result.GetResult().connection_id();
+ }
+
+ {//Modify
+ const auto request = ::NFq::TModifyConnectionBuilder()
+ .SetName("modified_name")
+ .SetConnectionId(conId)
+ .CreateYdb("new ydb", "")
+ .SetDescription("Modified")
+ .Build();
+ const auto result = client
+ .ModifyConnection(request, CreateFqSettings<TModifyConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto request = ::NFq::TDescribeConnectionBuilder()
+ .SetConnectionId(conId)
+ .Build();
+ auto result = client
+ .DescribeConnection(request, CreateFqSettings<TDescribeConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto& res = result.GetResult().connection();
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().id(), conId);
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().created_by(), "root@builtin");
+ UNIT_ASSERT_VALUES_EQUAL(res.meta().modified_by(), "root@builtin");
+ UNIT_ASSERT_VALUES_EQUAL(res.content().name(), "modified_name");
+ UNIT_ASSERT_VALUES_EQUAL(res.content().description(), "Modified");
+ }
+ }
+
+ Y_UNIT_TEST(DeleteConnections) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = TString(__func__) + "folder_id";
+ TString conId;
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("created_conn")
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ conId = result.GetResult().connection_id();
+ }
+
+ {
+ const auto request = ::NFq::TDeleteConnectionBuilder()
+ .SetConnectionId(conId)
+ .Build();
+
+ const auto result = client
+ .DeleteConnection(request, CreateFqSettings<TDeleteConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(Create_And_Modify_The_Same_Connection) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ TString conId;
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName("created_conn")
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ conId = result.GetResult().connection_id();
+ }
+
+ {
+ const auto request = ::NFq::TModifyConnectionBuilder()
+ .SetConnectionId(conId)
+ .CreateYdb("modified_db", "")//TODO remove
+ .Build();
+ const auto result = client
+ .ModifyConnection(request, CreateFqSettings<TModifyConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(CreateConnection_With_Existing_Name) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ auto name = TString(__func__) + "_name";
+ name.to_lower();
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName(name)
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName(name)
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::BAD_REQUEST, result.GetIssues().ToString()); //TODO status should be ALREADY_EXISTS
+ }
+ }
+
+ Y_UNIT_TEST(CreateConnections_With_Idempotency) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ const auto name = "connection_name";
+ const TString idempotencyKey = "idempotency_key";
+ TString conId;
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName(name)
+ .SetIdempotencyKey(idempotencyKey)
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ conId = result.GetResult().connection_id();
+ }
+
+ {
+ const auto request = ::NFq::TCreateConnectionBuilder()
+ .SetName(name)
+ .SetIdempotencyKey(idempotencyKey)
+ .CreateYdb("created_db", "")
+ .Build();
+ const auto result = client
+ .CreateConnection(request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(conId, result.GetResult().connection_id());
+ }
+ }
+
+ Y_UNIT_TEST(CreateQuery_With_Idempotency) {//TODO Fix
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ const TString idempotencyKey = "idempotency_key";
+ const TString yqlText = "select 1";
+ TString queryId;
+ const auto request = ::NFq::TCreateQueryBuilder{}
+ .SetText(yqlText)
+ .SetIdempotencyKey(idempotencyKey)
+ .Build();
+
+ {
+ auto result = client.CreateQuery(
+ request, CreateFqSettings<TCreateQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ queryId = result.GetResult().query_id();
+ }
+
+ {
+ const auto req = ::NFq::TDescribeQueryBuilder{}
+ .SetQueryId(queryId)
+ .Build();
+ const auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.DescribeQuery(
+ req, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto status = result.GetResult().query().meta().status();
+ PrintProtoIssues(result.GetResult().query().issue());
+ return status == FederatedQuery::QueryMeta::COMPLETED;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+ {
+ auto result = client.CreateQuery(
+ request, CreateFqSettings<TCreateQuerySettings>(folderId))
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(queryId, result.GetResult().query_id());
+ }
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+ }
+
+ // use fork for data test due to ch initialization problem
+ SIMPLE_UNIT_FORKED_TEST(CreateQuery_Without_Connection) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const TString yqlText = "select count(*) from testdbWTF.`connections`";
+ CreateNewHistoryAndWaitFinish("folder_id_WTF", client,
+ yqlText, FederatedQuery::QueryMeta::FAILED);
+ }
+
+ Y_UNIT_TEST(DeleteQuery) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ const TString yqlText = "select 1";
+ const TString queryId = CreateNewHistoryAndWaitFinish(folderId, client,
+ yqlText, FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+
+ {
+ const auto request = ::NFq::TDeleteQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ auto result = client
+ .DeleteQuery(request, CreateFqSettings<TDeleteQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ auto result = client
+ .DescribeQuery(request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::BAD_REQUEST, result.GetIssues().ToString());
+ }
+ }
+
+ Y_UNIT_TEST(ModifyQuery) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ TString userToken = "root@builtin";
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken(userToken));
+ NYdb::NFq::TClient client(driver);
+
+ const auto folderId = TString(__func__) + "folder_id";
+ const TString yqlText = "select 1";
+ const TString queryId = CreateNewHistoryAndWaitFinish(folderId, client,
+ yqlText, FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+
+ {
+ const auto request = ::NFq::TModifyQueryBuilder()
+ .SetQueryId(queryId)
+ .SetName("ModifiedName")
+ .SetDescription("OK")
+ .Build();
+ auto result = client
+ .ModifyQuery(request, CreateFqSettings<TModifyQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ auto result = client
+ .DescribeQuery(request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto& query = result.GetResult().query();
+ UNIT_ASSERT_VALUES_EQUAL(query.content().name(), "ModifiedName");
+ UNIT_ASSERT_VALUES_EQUAL(query.content().description(), "OK");
+ }
+ }
+
+ Y_UNIT_TEST(DescribeJob) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = "some_folder_id";
+ const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+ TString jobId;
+
+ {
+ auto request = ::NFq::TListJobsBuilder{}.SetQueryId(queryId).Build();
+ auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.ListJobs(
+ request, CreateFqSettings<TListJobsSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job_size(), 1);
+ jobId = result.GetResult().job(0).meta().id();
+ return result.GetStatus() == EStatus::SUCCESS;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+
+ {
+ const auto request = ::NFq::TDescribeJobBuilder()
+ .SetJobId(jobId)
+ .Build();
+ auto result = client.DescribeJob(
+ request, CreateFqSettings<TDescribeJobSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().query_meta().common().id(), queryId);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().meta().id(), jobId);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetResult().job().query_name(), "test_query_name_1");
+ }
+ }
+
+ Y_UNIT_TEST(DescribeQuery) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = "some_folder_id";
+ const auto queryId = CreateNewHistoryAndWaitFinish(folderId, client, "select 1", FederatedQuery::QueryMeta::COMPLETED);
+ CheckGetResultData(client, queryId, folderId, 1, 1, 1);
+ TString jobId;
+
+ {
+ const auto request = ::NFq::TDescribeQueryBuilder()
+ .SetQueryId(queryId)
+ .Build();
+ auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto query = result.GetResult().query();
+ UNIT_ASSERT_VALUES_EQUAL(FederatedQuery::QueryMeta::ComputeStatus_Name(query.meta().status()), FederatedQuery::QueryMeta::ComputeStatus_Name(FederatedQuery::QueryMeta::COMPLETED));
+ UNIT_ASSERT_VALUES_EQUAL(query.content().text(), "select 1");
+ UNIT_ASSERT_VALUES_EQUAL(query.content().name(), "test_query_name_1");
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(Yq_2) {
+ // use fork for data test due to ch initialization problem
+ Y_UNIT_TEST(Test_HostNameTrasformation) {
+ UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("rc1c-p5waby2y5y1kb5ue.mdb.yandexcloud.net"), "rc1c-p5waby2y5y1kb5ue.db.yandex.net:8443");
+ UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("xxx.xxx"), "xxx.db.yandex.net:8443");
+ UNIT_ASSERT_VALUES_EQUAL(::NYq::TransformMdbHostToCorrectFormat("host."), "host.db.yandex.net:8443");
+ }
+
+ SIMPLE_UNIT_FORKED_TEST(ReadFromYdbOverYq) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ NYdb::NFq::TClient client(driver);
+ const auto folderId = TString(__func__) + "folder_id";
+
+ {
+ auto request = ::NFq::TCreateConnectionBuilder{}
+ .SetName("testdb00")
+ .CreateYdb("Root", location, "")
+ .Build();
+
+ auto result = client.CreateConnection(
+ request, CreateFqSettings<TCreateConnectionSettings>(folderId))
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ TString queryId;
+ {
+ auto request = ::NFq::TCreateQueryBuilder{}
+ .SetText("select count(*) from testdb00.`yq/connections`")
+ .Build();
+ auto result = client.CreateQuery(
+ request, CreateFqSettings<TCreateQuerySettings>(folderId))
+ .ExtractValueSync();
+
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ queryId = result.GetResult().query_id();
+ }
+
+ {
+ auto request = ::NFq::TDescribeQueryBuilder{}.SetQueryId(queryId).Build();
+ auto result = DoWithRetryOnRetCode([&]() {
+ auto result = client.DescribeQuery(
+ request, CreateFqSettings<TDescribeQuerySettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ const auto status = result.GetResult().query().meta().status();
+ PrintProtoIssues(result.GetResult().query().issue());
+ return status == FederatedQuery::QueryMeta::COMPLETED;
+ }, TRetryOptions(10));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+
+ {
+ auto request = ::NFq::TGetResultDataBuilder{}.SetQueryId(queryId).Build();
+ auto result = client.GetResultData(
+ request, CreateFqSettings<TGetResultDataSettings>(folderId))
+ .ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+
+ const auto& resultSet = result.GetResult().result_set();
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.rows().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.columns().size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.rows(0).items(0).uint64_value(), 1);
+ }
+ }
+}
+
+Y_UNIT_TEST_SUITE(PrivateApi) {
+ Y_UNIT_TEST(PingTask) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ ::NFq::TPrivateClient client(driver);
+ const TString historyId = "id";
+ const TString folderId = "folder_id";
+ const TScope scope(folderId);
+ {
+ Fq::Private::PingTaskRequest req;
+ req.mutable_query_id()->set_value("id");
+ req.set_scope(scope.ToString());
+ req.set_owner_id("some_owner");
+ req.set_status(FederatedQuery::QueryMeta::COMPLETED);
+ auto result = client.PingTask(std::move(req)).ExtractValueSync();
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR);
+ }
+ }
+
+ Y_UNIT_TEST(GetTask) {//PendingFetcher can take task first
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ ::NFq::TPrivateClient client(driver);
+ {
+ Fq::Private::GetTaskRequest req;
+ req.set_owner_id("owner_id");
+ req.set_host("host");
+ auto result = client.GetTask(std::move(req)).ExtractValueSync();
+ result.GetIssues().PrintTo(Cerr);
+ UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
+ result.GetIssues().PrintTo(Cerr);
+ }
+ }
+
+ Y_UNIT_TEST(Nodes) {
+ TKikimrWithGrpcAndRootSchema server({}, {}, {}, true);
+ ui16 grpc = server.GetPort();
+ TString location = TStringBuilder() << "localhost:" << grpc;
+ auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin"));
+ ::NFq::TPrivateClient client(driver);
+ const auto instanceId = CreateGuidAsString();
+ {
+ Fq::Private::NodesHealthCheckRequest req;
+ req.set_tenant("Tenant");
+ auto& node = *req.mutable_node();
+ node.set_hostname("hostname");
+ node.set_node_id(100500);
+ node.set_instance_id(instanceId);
+ const auto result = DoWithRetryOnRetCode([&]() {
+ auto r = req;
+ auto result = client.NodesHealthCheck(std::move(r)).ExtractValueSync();
+ if (result.GetStatus() == EStatus::SUCCESS) {
+ const auto& res = result.GetResult();
+ UNIT_ASSERT(!res.nodes().empty());
+ UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).hostname(), "hostname");
+ UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).node_id(), 100500);
+ UNIT_ASSERT_VALUES_EQUAL(res.nodes(0).instance_id(), instanceId);
+ }
+ // result.GetIssues().PrintTo(Cerr);
+ return result.GetStatus() == EStatus::SUCCESS;
+ }, TRetryOptions(Retries));
+ UNIT_ASSERT_C(result, "the execution of the query did not end within the time limit");
+ }
+ }
+}
diff --git a/ydb/services/fq/ut_integration/ut_utils.cpp b/ydb/services/fq/ut_integration/ut_utils.cpp
new file mode 100644
index 00000000000..ccbe7b1998f
--- /dev/null
+++ b/ydb/services/fq/ut_integration/ut_utils.cpp
@@ -0,0 +1,92 @@
+#include "ut_utils.h"
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <ydb/public/lib/fq/scope.h>
+#include <ydb/core/yq/libs/actors/proxy.h>
+#include <ydb/core/yq/libs/events/events.h>
+
+#include <library/cpp/time_provider/time_provider.h>
+#include <library/cpp/testing/common/env.h>
+#include <library/cpp/protobuf/util/pb_io.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
+#include <ydb/library/yql/utils/bind_in_range.h>
+
+#include <util/system/file.h>
+#include <util/stream/str.h>
+#include <util/string/printf.h>
+#include <util/string/builder.h>
+
+#include <library/cpp/protobuf/json/proto2json.h>
+
+using namespace NYdb;
+using namespace NYdb::NFq;
+using namespace NActors;
+
+void UpsertToExistingTable(TDriver& driver, const TString& location){
+ Y_UNUSED(location);
+ const TString tablePrefix = "Root/yq";
+ NYdb::NTable::TClientSettings settings;
+ NYdb::NTable::TTableClient client(driver, settings);
+ auto sessionOp = client.CreateSession().ExtractValueSync();
+ if (!sessionOp.IsSuccess()) {
+ ythrow yexception() << sessionOp.GetStatus() << "\n" << sessionOp.GetIssues().ToString();
+ }
+ auto session = sessionOp.GetSession();
+ auto timeProvider = CreateDefaultTimeProvider();
+ auto now = timeProvider->Now();
+ NYdb::TParamsBuilder paramsBuilder;
+
+ paramsBuilder.AddParam("$now").Timestamp(now).Build();
+ auto params = paramsBuilder.Build();
+
+ const TString scope = TScope("some_folder_id").ToString();
+
+ {
+ auto result = session.ExecuteSchemeQuery(
+ Sprintf(R"___(
+ --!syntax_v1
+ CREATE TABLE `%s/%s` (
+ id String,
+ PRIMARY KEY (id)
+ );
+ )___", tablePrefix.c_str(), "empty_table")
+ ).ExtractValueSync();
+ if (!result.IsSuccess()) {
+ ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString();
+ }
+ }
+
+ {
+ NYdb::TParamsBuilder paramsBuilder;
+ auto result = session.ExecuteDataQuery(
+ Sprintf(R"___(
+ --!syntax_v1
+ upsert into `%s/empty_table`
+ (id)
+ values
+ ("test_id1"), ("test_id2");
+ )___", tablePrefix.c_str()),
+ NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
+ paramsBuilder.Build(),
+ NYdb::NTable::TExecDataQuerySettings()).ExtractValueSync();
+ if (!result.IsSuccess()) {
+ ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString();
+ }
+ }
+
+ {
+ NYdb::TParamsBuilder paramsBuilder;
+ auto result = session.ExecuteDataQuery(
+ Sprintf(R"___(
+ --!syntax_v1
+ delete from `%s/empty_table`;
+ )___", tablePrefix.c_str()),
+ NYdb::NTable::TTxControl::BeginTx(NYdb::NTable::TTxSettings::SerializableRW()).CommitTx(),
+ paramsBuilder.Build(),
+ NYdb::NTable::TExecDataQuerySettings()).ExtractValueSync();
+ if (!result.IsSuccess()) {
+ ythrow yexception() << result.GetStatus() << "\n" << result.GetIssues().ToString();
+ }
+ }
+}
diff --git a/ydb/services/fq/ut_integration/ut_utils.h b/ydb/services/fq/ut_integration/ut_utils.h
new file mode 100644
index 00000000000..a627c45929c
--- /dev/null
+++ b/ydb/services/fq/ut_integration/ut_utils.h
@@ -0,0 +1,9 @@
+#pragma once
+
+#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
+#include <util/system/shellcommand.h>
+#include <contrib/libs/curl/include/curl/curl.h>
+#include <library/cpp/json/json_reader.h>
+
+void UpsertToExistingTable(NYdb::TDriver& driver, const TString& location);
+
diff --git a/ydb/services/yq/grpc_service.cpp b/ydb/services/yq/grpc_service.cpp
index a7505e18d0d..c198b1c992c 100644
--- a/ydb/services/yq/grpc_service.cpp
+++ b/ydb/services/yq/grpc_service.cpp
@@ -256,20 +256,20 @@ MakeIntrusive<TGRpcRequest<YandexQuery::NAME##Request, YandexQuery::NAME##Respon
ADD_REQUEST(ModifyQuery, DoYandexQueryModifyQueryRequest, ModifyQueryPermissions)
ADD_REQUEST(DeleteQuery, DoYandexQueryDeleteQueryRequest, DeleteQueryPermissions)
ADD_REQUEST(ControlQuery, DoYandexQueryControlQueryRequest, ControlQueryPermissions)
- ADD_REQUEST(GetResultData, DoGetResultDataRequest, GetResultDataPermissions)
- ADD_REQUEST(ListJobs, DoListJobsRequest, ListJobsPermissions)
- ADD_REQUEST(DescribeJob, DoDescribeJobRequest, DescribeJobPermissions)
- ADD_REQUEST(CreateConnection, DoCreateConnectionRequest, CreateConnectionPermissions)
- ADD_REQUEST(ListConnections, DoListConnectionsRequest, ListConnectionsPermissions)
- ADD_REQUEST(DescribeConnection, DoDescribeConnectionRequest, DescribeConnectionPermissions)
- ADD_REQUEST(ModifyConnection, DoModifyConnectionRequest, ModifyConnectionPermissions)
- ADD_REQUEST(DeleteConnection, DoDeleteConnectionRequest, DeleteConnectionPermissions)
- ADD_REQUEST(TestConnection, DoTestConnectionRequest, TestConnectionPermissions)
- ADD_REQUEST(CreateBinding, DoCreateBindingRequest, CreateBindingPermissions)
- ADD_REQUEST(ListBindings, DoListBindingsRequest, ListBindingsPermissions)
- ADD_REQUEST(DescribeBinding, DoDescribeBindingRequest, DescribeBindingPermissions)
- ADD_REQUEST(ModifyBinding, DoModifyBindingRequest, ModifyBindingPermissions)
- ADD_REQUEST(DeleteBinding, DoDeleteBindingRequest, DeleteBindingPermissions)
+ ADD_REQUEST(GetResultData, DoYandexQueryGetResultDataRequest, GetResultDataPermissions)
+ ADD_REQUEST(ListJobs, DoYandexQueryListJobsRequest, ListJobsPermissions)
+ ADD_REQUEST(DescribeJob, DoYandexQueryDescribeJobRequest, DescribeJobPermissions)
+ ADD_REQUEST(CreateConnection, DoYandexQueryCreateConnectionRequest, CreateConnectionPermissions)
+ ADD_REQUEST(ListConnections, DoYandexQueryListConnectionsRequest, ListConnectionsPermissions)
+ ADD_REQUEST(DescribeConnection, DoYandexQueryDescribeConnectionRequest, DescribeConnectionPermissions)
+ ADD_REQUEST(ModifyConnection, DoYandexQueryModifyConnectionRequest, ModifyConnectionPermissions)
+ ADD_REQUEST(DeleteConnection, DoYandexQueryDeleteConnectionRequest, DeleteConnectionPermissions)
+ ADD_REQUEST(TestConnection, DoYandexQueryTestConnectionRequest, TestConnectionPermissions)
+ ADD_REQUEST(CreateBinding, DoYandexQueryCreateBindingRequest, CreateBindingPermissions)
+ ADD_REQUEST(ListBindings, DoYandexQueryListBindingsRequest, ListBindingsPermissions)
+ ADD_REQUEST(DescribeBinding, DoYandexQueryDescribeBindingRequest, DescribeBindingPermissions)
+ ADD_REQUEST(ModifyBinding, DoYandexQueryModifyBindingRequest, ModifyBindingPermissions)
+ ADD_REQUEST(DeleteBinding, DoYandexQueryDeleteBindingRequest, DeleteBindingPermissions)
#undef ADD_REQUEST