diff options
author | d-mokhnatkin <[email protected]> | 2022-07-21 11:11:11 +0300 |
---|---|---|
committer | d-mokhnatkin <[email protected]> | 2022-07-21 11:11:11 +0300 |
commit | b651f08845dea01bbd05b6b7d56aba3992374706 (patch) | |
tree | eecd2915f1917a7f306fe68184b7a8ac4e5b6d6b | |
parent | 341f0f2ef841c5277ec8520325d8f15521442cd0 (diff) |
move to new private api
51 files changed, 239 insertions, 946 deletions
diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index c063e630c05..83780af363d 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -104,7 +104,6 @@ #include <ydb/services/ydb/ydb_scripting.h> #include <ydb/services/ydb/ydb_table.h> #include <ydb/services/yq/grpc_service.h> -#include <ydb/services/yq/private_grpc.h> #include <ydb/core/yq/libs/init/init.h> @@ -740,11 +739,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { if (hasYandexQuery) { server.AddService(new NGRpcService::TGRpcYandexQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); server.AddService(new NGRpcService::TGRpcFederatedQueryService(ActorSystem.Get(), Counters, grpcRequestProxyId)); - // TODO: REMOVE next line after migration to "yq_private" - server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } /* REMOVE */ else /* THIS else as well and separate ifs */ if (hasYandexQueryPrivate) { - server.AddService(new NGRpcService::TGRpcYqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); server.AddService(new NGRpcService::TGRpcFqPrivateTaskService(ActorSystem.Get(), Counters, grpcRequestProxyId)); } diff --git a/ydb/core/grpc_services/CMakeLists.txt b/ydb/core/grpc_services/CMakeLists.txt index 90ffceb87cc..d2d3a58de2d 100644 --- a/ydb/core/grpc_services/CMakeLists.txt +++ b/ydb/core/grpc_services/CMakeLists.txt @@ -118,5 +118,4 @@ target_sources(ydb-core-grpc_services PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_yq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_profiles.cpp ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/table_settings.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/grpc_services/rpc_analytics_internal.cpp ) diff --git a/ydb/core/grpc_services/rpc_analytics_internal.cpp b/ydb/core/grpc_services/rpc_analytics_internal.cpp deleted file mode 100644 index 09b0ca48dff..00000000000 --- a/ydb/core/grpc_services/rpc_analytics_internal.cpp +++ /dev/null @@ -1,117 +0,0 @@ -#include "service_analytics_internal.h" -#include "rpc_common.h" -#include "rpc_deferrable.h" - -#include <ydb/core/yq/libs/events/events.h> -#include <ydb/core/yq/libs/actors/proxy_private.h> - -#include <library/cpp/actors/core/hfunc.h> - -#include <ydb/core/grpc_services/base/base.h> -#include <ydb/core/yq/libs/protos/yq_private.pb.h> - -namespace NKikimr { -namespace NGRpcService { - -using TEvYqPrivatePingTaskRequest = - TGrpcRequestOperationCall<Yq::Private::PingTaskRequest, Yq::Private::PingTaskResponse>; -using TEvYqPrivateGetTaskRequest = - TGrpcRequestOperationCall<Yq::Private::GetTaskRequest, Yq::Private::GetTaskResponse>; -using TEvYqPrivateWriteTaskResultRequest = - TGrpcRequestOperationCall<Yq::Private::WriteTaskResultRequest, Yq::Private::WriteTaskResultResponse>; -using TEvYqPrivateNodesHealthCheckRequest = - TGrpcRequestOperationCall<Yq::Private::NodesHealthCheckRequest, Yq::Private::NodesHealthCheckResponse>; - -namespace { - template <typename TEv, typename TReq> - void SendResponse(const TEv& ev, TReq& req) { - if (!ev->Get()->Record) { - req.RaiseIssues(ev->Get()->Issues); - req.ReplyWithYdbStatus(ev->Get()->Status); - } else { - req.SendResult(*ev->Get()->Record, ev->Get()->Status); - } - } - -} - -template <typename RpcRequestType, typename EvRequestType, typename EvResponseType> -class TYqPrivateRequestRPC : public TRpcOperationRequestActor< - TYqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> { - - using TBase = TRpcOperationRequestActor< - TYqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, - RpcRequestType>; - -public: - TYqPrivateRequestRPC(IRequestOpCtx* request) : TBase(request) {} - - void Bootstrap(const TActorContext& ctx) { - Y_UNUSED(ctx); - const auto req = this->GetProtoRequest(); - auto ev = MakeHolder<EvRequestType>(); - auto request = dynamic_cast<RpcRequestType*>(this->Request_.get()); - Y_VERIFY(request); - auto proxyCtx = dynamic_cast<IRequestProxyCtx*>(request); - Y_VERIFY(proxyCtx); - TString user; - const TString& internalToken = proxyCtx->GetInternalToken(); - if (internalToken) { - NACLib::TUserToken userToken(internalToken); - user = userToken.GetUserSID(); - } - ev->Record = *req; - ev->User = user; - this->Send(NYq::MakeYqPrivateProxyId(), ev.Release()); - this->Become(&TYqPrivateRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc); - } - -private: - STRICT_STFUNC(StateFunc, - HFunc(EvResponseType, Handle); - ) - - void Handle(typename EvResponseType::TPtr& ev, const TActorContext& ctx) { - SendResponse(ev, *this->Request_); - this->Die(ctx); - } -}; - -using TYqPrivatePingTaskRPC = TYqPrivateRequestRPC< - TEvYqPrivatePingTaskRequest, - NYq::TEvents::TEvPingTaskRequest, - NYq::TEvents::TEvPingTaskResponse>; - -using TYqPrivateGetTaskRPC = TYqPrivateRequestRPC< - TEvYqPrivateGetTaskRequest, - NYq::TEvents::TEvGetTaskRequest, - NYq::TEvents::TEvGetTaskResponse>; - -using TYqPrivateWriteTaskResultRPC = TYqPrivateRequestRPC< - TEvYqPrivateWriteTaskResultRequest, - NYq::TEvents::TEvWriteTaskResultRequest, - NYq::TEvents::TEvWriteTaskResultResponse>; - -using TYqPrivateNodesHealthCheckRPC = TYqPrivateRequestRPC< - TEvYqPrivateNodesHealthCheckRequest, - NYq::TEvents::TEvNodesHealthCheckRequest, - NYq::TEvents::TEvNodesHealthCheckResponse>; - -void DoYqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYqPrivatePingTaskRPC(p.release())); -} - -void DoYqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYqPrivateGetTaskRPC(p.release())); -} - -void DoYqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYqPrivateWriteTaskResultRPC(p.release())); -} - -void DoYqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { - TActivationContext::AsActorContext().Register(new TYqPrivateNodesHealthCheckRPC(p.release())); -} - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/core/grpc_services/rpc_fq_internal.cpp b/ydb/core/grpc_services/rpc_fq_internal.cpp index b2f9c60c796..fc7c56ab3f3 100644 --- a/ydb/core/grpc_services/rpc_fq_internal.cpp +++ b/ydb/core/grpc_services/rpc_fq_internal.cpp @@ -22,12 +22,12 @@ using TEvFqPrivateWriteTaskResultRequest = using TEvFqPrivateNodesHealthCheckRequest = TGrpcRequestOperationCall<Fq::Private::NodesHealthCheckRequest, Fq::Private::NodesHealthCheckResponse>; -template <typename RpcRequestType, typename EvRequestType, typename EvResponseType, typename CastRequest, typename CastResult> +template <typename RpcRequestType, typename EvRequestType, typename EvResponseType> class TFqPrivateRequestRPC : public TRpcOperationRequestActor< - TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, RpcRequestType> { + TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType> { using TBase = TRpcOperationRequestActor< - TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType,CastRequest,CastResult>, + TFqPrivateRequestRPC<RpcRequestType,EvRequestType,EvResponseType>, RpcRequestType>; public: @@ -36,10 +36,6 @@ public: void Bootstrap(const TActorContext& ctx) { Y_UNUSED(ctx); const auto req = this->GetProtoRequest(); - TProtoStringType protoString; - Y_ENSURE(req->SerializeToString(&protoString)); - auto castedRequest = CastRequest(); - Y_ENSURE(castedRequest.ParseFromString(protoString)); auto ev = MakeHolder<EvRequestType>(); auto request = dynamic_cast<RpcRequestType*>(this->Request_.get()); Y_VERIFY(request); @@ -51,10 +47,10 @@ public: NACLib::TUserToken userToken(internalToken); user = userToken.GetUserSID(); } - ev->Record = castedRequest; + ev->Record = *req; ev->User = user; this->Send(NYq::MakeYqPrivateProxyId(), ev.Release()); - this->Become(&TFqPrivateRequestRPC<RpcRequestType, EvRequestType, EvResponseType, CastRequest, CastResult>::StateFunc); + this->Become(&TFqPrivateRequestRPC<RpcRequestType, EvRequestType, EvResponseType>::StateFunc); } private: @@ -73,12 +69,7 @@ private: req.RaiseIssues(ev->Get()->Issues); req.ReplyWithYdbStatus(ev->Get()->Status); } else { - TProtoStringType protoString; - Y_ENSURE(ev->Get()->Record->SerializeToString(&protoString)); - auto castedResult = CastResult(); - Y_ENSURE(castedResult.ParseFromString(protoString)); - - req.SendResult(castedResult, ev->Get()->Status); + req.SendResult(*ev->Get()->Record, ev->Get()->Status); } } }; @@ -86,30 +77,22 @@ private: using TFqPrivatePingTaskRPC = TFqPrivateRequestRPC< TEvFqPrivatePingTaskRequest, NYq::TEvents::TEvPingTaskRequest, - NYq::TEvents::TEvPingTaskResponse, - Yq::Private::PingTaskRequest, - Fq::Private::PingTaskResult>; + NYq::TEvents::TEvPingTaskResponse>; using TFqPrivateGetTaskRPC = TFqPrivateRequestRPC< TEvFqPrivateGetTaskRequest, NYq::TEvents::TEvGetTaskRequest, - NYq::TEvents::TEvGetTaskResponse, - Yq::Private::GetTaskRequest, - Fq::Private::GetTaskResult>; + NYq::TEvents::TEvGetTaskResponse>; using TFqPrivateWriteTaskResultRPC = TFqPrivateRequestRPC< TEvFqPrivateWriteTaskResultRequest, NYq::TEvents::TEvWriteTaskResultRequest, - NYq::TEvents::TEvWriteTaskResultResponse, - Yq::Private::WriteTaskResultRequest, - Fq::Private::WriteTaskResultResult>; + NYq::TEvents::TEvWriteTaskResultResponse>; using TFqPrivateNodesHealthCheckRPC = TFqPrivateRequestRPC< TEvFqPrivateNodesHealthCheckRequest, NYq::TEvents::TEvNodesHealthCheckRequest, - NYq::TEvents::TEvNodesHealthCheckResponse, - Yq::Private::NodesHealthCheckRequest, - Fq::Private::NodesHealthCheckResult>; + NYq::TEvents::TEvNodesHealthCheckResponse>; void DoFqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider&) { TActivationContext::AsActorContext().Register(new TFqPrivatePingTaskRPC(p.release())); diff --git a/ydb/core/grpc_services/service_analytics_internal.h b/ydb/core/grpc_services/service_analytics_internal.h deleted file mode 100644 index f3d1ed966d9..00000000000 --- a/ydb/core/grpc_services/service_analytics_internal.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#include <memory> - -namespace NKikimr { -namespace NGRpcService { - -class IRequestOpCtx; -class IFacilityProvider; - -void DoYqPrivatePingTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYqPrivateGetTaskRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYqPrivateWriteTaskResultRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); -void DoYqPrivateNodesHealthCheckRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& facility); - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 68575bb8130..dfc7710007b 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -7,7 +7,6 @@ #include <ydb/core/grpc_services/grpc_request_proxy.h> #include <ydb/services/auth/grpc_service.h> #include <ydb/services/yq/grpc_service.h> -#include <ydb/services/yq/private_grpc.h> #include <ydb/services/fq/grpc_service.h> #include <ydb/services/fq/private_grpc.h> #include <ydb/services/cms/grpc_service.h> @@ -329,7 +328,6 @@ namespace Tests { GRpcServer->AddService(new NGRpcService::TGRpcMonitoringService(system, counters, grpcRequestProxyId)); if (Settings->EnableYq) { GRpcServer->AddService(new NGRpcService::TGRpcYandexQueryService(system, counters, grpcRequestProxyId)); - GRpcServer->AddService(new NGRpcService::TGRpcYqPrivateTaskService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcFederatedQueryService(system, counters, grpcRequestProxyId)); GRpcServer->AddService(new NGRpcService::TGRpcFqPrivateTaskService(system, counters, grpcRequestProxyId)); } diff --git a/ydb/core/yq/libs/actors/nodes_manager.cpp b/ydb/core/yq/libs/actors/nodes_manager.cpp index 692ec6a8e06..16e589a2d8f 100644 --- a/ydb/core/yq/libs/actors/nodes_manager.cpp +++ b/ydb/core/yq/libs/actors/nodes_manager.cpp @@ -57,7 +57,7 @@ public: , IcPort(icPort) , UseDataCenter(useDataCenter) , DataCenter(dataCenter) - , InternalServiceId(MakeInternalServiceActorId()) + , InternalServiceId(NFq::MakeInternalServiceActorId()) { InstanceId = GetGuidAsString(RandomProvider->GenUuid4()); @@ -173,7 +173,7 @@ private: hFunc(NDqs::TEvAllocateWorkersRequest, Handle) hFunc(NDqs::TEvFreeWorkersNotify, Handle) hFunc(NActors::TEvents::TEvUndelivered, OnUndelivered) - hFunc(TEvInternalService::TEvHealthCheckResponse, HandleResponse) + hFunc(NFq::TEvInternalService::TEvHealthCheckResponse, HandleResponse) ) void HandleWakeup(NActors::TEvents::TEvWakeup::TPtr& ev) { @@ -192,7 +192,7 @@ private: ServiceCounters.Counters->GetCounter("NodesHealthCheck", true)->Inc(); - Yq::Private::NodesHealthCheckRequest request; + Fq::Private::NodesHealthCheckRequest request; request.set_tenant(Tenant); auto& node = *request.mutable_node(); node.set_node_id(SelfId().NodeId()); @@ -203,7 +203,7 @@ private: node.set_memory_allocated(AtomicGet(WorkerManagerCounters.MkqlMemoryAllocated->GetAtomic())); node.set_interconnect_port(IcPort); node.set_data_center(DataCenter); - Send(InternalServiceId, new TEvInternalService::TEvHealthCheckRequest(request)); + Send(InternalServiceId, new NFq::TEvInternalService::TEvHealthCheckRequest(request)); } void OnUndelivered(NActors::TEvents::TEvUndelivered::TPtr&) { @@ -211,7 +211,7 @@ private: ServiceCounters.Counters->GetCounter("OnUndelivered", true)->Inc(); } - void HandleResponse(TEvInternalService::TEvHealthCheckResponse::TPtr& ev) { + void HandleResponse(NFq::TEvInternalService::TEvHealthCheckResponse::TPtr& ev) { try { const auto& status = ev->Get()->Status.GetStatus(); THolder<TEvInterconnect::TEvNodesInfo> nameServiceUpdateReq(new TEvInterconnect::TEvNodesInfo()); diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 0162b2c8fdf..c1eee984a66 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -71,6 +71,7 @@ namespace NYq { using namespace NActors; using namespace NYql; +using namespace NFq; namespace { @@ -241,14 +242,14 @@ private: void GetPendingTask() { FetcherGeneration++; LOG_D("Request Private::GetTask" << ", Owner: " << GetOwnerId() << ", Host: " << HostName() << ", Tenant: " << TenantName); - Yq::Private::GetTaskRequest request; + Fq::Private::GetTaskRequest request; request.set_owner_id(GetOwnerId()); request.set_host(HostName()); request.set_tenant(TenantName); Send(InternalServiceId, new TEvInternalService::TEvGetTaskRequest(request)); } - void ProcessTask(const Yq::Private::GetTaskResult& result) { + void ProcessTask(const Fq::Private::GetTaskResult& result) { for (const auto& task : result.tasks()) { RunTask(task); } @@ -259,7 +260,7 @@ private: return FetcherGuid + ToString(FetcherGeneration); } - void RunTask(const Yq::Private::GetTaskResult::Task& task) { + void RunTask(const Fq::Private::GetTaskResult::Task& task) { LOG_D("NewTask:" << " Scope: " << task.scope() << " Id: " << task.query_id().value() diff --git a/ydb/core/yq/libs/actors/pinger.cpp b/ydb/core/yq/libs/actors/pinger.cpp index 9cd8b06258f..97c91cf397e 100644 --- a/ydb/core/yq/libs/actors/pinger.cpp +++ b/ydb/core/yq/libs/actors/pinger.cpp @@ -33,6 +33,7 @@ namespace NYq { using namespace NActors; using namespace NYql; +using namespace NFq; struct TEvPingResponse : public NActors::TEventLocal<TEvPingResponse, NActors::TEvents::TSystem::CallbackCompletion> { TPingTaskResult Result; @@ -381,10 +382,10 @@ private: if (!retry) { RetryState.Init(TActivationContext::Now(), StartLeaseTime, Config.PingPeriod); } - Ping(Yq::Private::PingTaskRequest(), ContinueLeaseRequestCookie); + Ping(Fq::Private::PingTaskRequest(), ContinueLeaseRequestCookie); } - void Ping(Yq::Private::PingTaskRequest request, ui64 cookie) { + void Ping(Fq::Private::PingTaskRequest request, ui64 cookie) { QueryCounters.SetUptimePublicAndServiceCounter((TInstant::Now() - CreatedAt).Seconds()); // Fill ids request.set_tenant(TenantName); diff --git a/ydb/core/yq/libs/actors/result_writer.cpp b/ydb/core/yq/libs/actors/result_writer.cpp index b3fee94c452..0f819203114 100644 --- a/ydb/core/yq/libs/actors/result_writer.cpp +++ b/ydb/core/yq/libs/actors/result_writer.cpp @@ -50,8 +50,8 @@ public: , TraceId(traceId) , Deadline(deadline) , ResultBytesLimit(resultBytesLimit) - , InternalServiceId(MakeInternalServiceActorId()) - { + , InternalServiceId(NFq::MakeInternalServiceActorId()) + { if (!ResultBytesLimit) { ResultBytesLimit = 20_MB; } @@ -73,7 +73,7 @@ private: HFunc(TEvReadyState, OnReadyState); HFunc(TEvQueryResponse, OnQueryResult); - hFunc(TEvInternalService::TEvWriteResultResponse, HandleResponse); + hFunc(NFq::TEvInternalService::TEvWriteResultResponse, HandleResponse); ) void PassAway() { @@ -118,7 +118,7 @@ private: void OnReadyState(TEvReadyState::TPtr&, const TActorContext&) { } - void HandleResponse(TEvInternalService::TEvWriteResultResponse::TPtr& ev) { + void HandleResponse(NFq::TEvInternalService::TEvWriteResultResponse::TPtr& ev) { const auto& issues = ev->Get()->Status.GetIssues(); if (issues) { SendIssuesAndSetErrorFlag(issues); @@ -175,8 +175,8 @@ private: Send(ev->Sender, res.Release()); } - Yq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { - Yq::Private::WriteTaskResultRequest protoReq; + Fq::Private::WriteTaskResultRequest CreateProtoRequestWithoutResultSet(ui64 startRowIndex) { + Fq::Private::WriteTaskResultRequest protoReq; protoReq.set_owner_id(ResultId.Owner); protoReq.mutable_result_id()->set_value(ResultId.Id); protoReq.set_offset(startRowIndex); @@ -191,7 +191,7 @@ private: return; } ++InflightCounter; - Send(InternalServiceId, new TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++]))); + Send(InternalServiceId, new NFq::TEvInternalService::TEvWriteResultRequest(std::move(ResultChunks[CurChunkInd++]))); } void ConstructResults(const Ydb::ResultSet& resultSet, ui64 startRowIndex) { @@ -323,7 +323,7 @@ private: ui64 ResultBytesLimit; ui64 OccupiedSpace = 0; - TVector<Yq::Private::WriteTaskResultRequest> ResultChunks; + TVector<Fq::Private::WriteTaskResultRequest> ResultChunks; size_t CurChunkInd = 0; ui32 InflightCounter = 0; TActorId InternalServiceId; diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index da62ccd1162..c2aca7904e3 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -84,6 +84,7 @@ namespace NYq { using namespace NActors; using namespace NYql; using namespace NDqs; +using namespace NFq; namespace { @@ -473,7 +474,7 @@ private: Issues.AddIssue("Internal Error"); if (!ConsumersAreDeleted) { - for (const Yq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { + for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { TransientIssues.AddIssue(TStringBuilder() << "Created read rule `" << c.consumer_name() << "` for topic `" << c.topic_path() << "` (database id " << c.database_id() << ") maybe was left undeleted: internal error occurred"); TransientIssues.back().Severity = NYql::TSeverityIds::S_WARNING; } @@ -706,7 +707,7 @@ private: } void Handle(TEvents::TEvRaiseTransientIssues::TPtr& ev) { - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; NYql::IssuesToMessage(ev->Get()->TransientIssues, request.mutable_transient_issues()); @@ -722,7 +723,7 @@ private: } void UpdateAstAndPlan(const TString& plan, const TString& expr) { - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; if (Compressor.IsEnabled()) { auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr); request.mutable_ast_compressed()->set_method(astCompressionMethod); @@ -744,7 +745,7 @@ private: return; } - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_result_set_count(UpdateResultIndices()); QueryStateUpdateRequest.set_result_set_count(UpdateResultIndices()); @@ -793,7 +794,7 @@ private: } void SetLoadFromCheckpointMode() { - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_state_load_mode(YandexQuery::FROM_LAST_CHECKPOINT); request.mutable_disposition()->mutable_from_last_checkpoint(); @@ -1050,7 +1051,7 @@ private: void RunReadRulesDeletionActor() { TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials; credentials.reserve(Params.CreatedTopicConsumers.size()); - for (const Yq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { + for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { if (const TString& tokenName = c.token_name()) { credentials.emplace_back( CreateCredentialsProviderFactoryForStructuredToken(Params.CredentialsFactory, FindTokenByName(tokenName), c.add_bearer_to_token())); @@ -1081,7 +1082,7 @@ private: { Params.Status = YandexQuery::QueryMeta::RUNNING; - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_status(YandexQuery::QueryMeta::RUNNING); *request.mutable_started_at() = google::protobuf::util::TimeUtil::MillisecondsToTimestamp(Now().MilliSeconds()); Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, UpdateQueryInfoCookie); @@ -1322,7 +1323,7 @@ private: const YandexQuery::QueryMeta::ComputeStatus finalizingStatus = GetFinalizingStatus(); Params.Status = finalizingStatus; LOG_D("Write finalizing status: " << YandexQuery::QueryMeta::ComputeStatus_Name(finalizingStatus)); - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_status(finalizingStatus); Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, SaveFinalizingStatusCookie); } @@ -1698,7 +1699,7 @@ private: ::NYql::NCommon::TServiceCounters QueryCounters; const ::NMonitoring::TDynamicCounters::TCounterPtr QueryUptime; bool EnableCheckpointCoordinator = false; - Yq::Private::PingTaskRequest QueryStateUpdateRequest; + Fq::Private::PingTaskRequest QueryStateUpdateRequest; const ui64 MaxTasksPerOperation; const TCompressor Compressor; diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 02b07642f2a..93c96406b7f 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -41,7 +41,7 @@ TRunActorParams::TRunActorParams( TVector<YandexQuery::ResultSetMeta> resultSetMetas, TVector<TString> dqGraphs, int32_t dqGraphIndex, - TVector<Yq::Private::TopicConsumer> createdTopicConsumers, + TVector<Fq::Private::TopicConsumer> createdTopicConsumers, bool automatic, const TString& queryName, const TInstant& deadline, diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index 1c187833072..8e3e32ea592 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -56,7 +56,7 @@ struct TRunActorParams { // TODO2 : Change name TVector<YandexQuery::ResultSetMeta> resultSetMetas, TVector<TString> dqGraphs, int32_t dqGraphIndex, - TVector<Yq::Private::TopicConsumer> createdTopicConsumers, + TVector<Fq::Private::TopicConsumer> createdTopicConsumers, bool automatic, const TString& queryName, const TInstant& deadline, @@ -108,7 +108,7 @@ struct TRunActorParams { // TODO2 : Change name const TVector<YandexQuery::ResultSetMeta> ResultSetMetas; const TVector<TString> DqGraphs; const int32_t DqGraphIndex; - TVector<Yq::Private::TopicConsumer> CreatedTopicConsumers; + TVector<Fq::Private::TopicConsumer> CreatedTopicConsumers; bool Automatic = false; TString QueryName; diff --git a/ydb/core/yq/libs/control_plane_storage/events/events.h b/ydb/core/yq/libs/control_plane_storage/events/events.h index 43a3151f03c..f1b94aef6fb 100644 --- a/ydb/core/yq/libs/control_plane_storage/events/events.h +++ b/ydb/core/yq/libs/control_plane_storage/events/events.h @@ -8,7 +8,7 @@ #include <library/cpp/actors/core/events.h> #include <library/cpp/actors/interconnect/events_local.h> -#include <ydb/core/yq/libs/protos/yq_private.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> #include <ydb/public/api/protos/yq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_params/params.h> @@ -362,7 +362,7 @@ struct TEvControlPlaneStorage { TEvWriteResultDataRequest() = default; explicit TEvWriteResultDataRequest( - Yq::Private::WriteTaskResultRequest&& request) + Fq::Private::WriteTaskResultRequest&& request) : Request(std::move(request)) {} @@ -371,13 +371,13 @@ struct TEvControlPlaneStorage { + Request.ByteSizeLong(); } - Yq::Private::WriteTaskResultRequest Request; + Fq::Private::WriteTaskResultRequest Request; }; struct TEvWriteResultDataResponse : NActors::TEventLocal<TEvWriteResultDataResponse, EvWriteResultDataResponse> { explicit TEvWriteResultDataResponse( - const Yq::Private::WriteTaskResultResult& record) + const Fq::Private::WriteTaskResultResult& record) : Record(record) {} @@ -393,7 +393,7 @@ struct TEvControlPlaneStorage { + GetDebugInfoByteSize(DebugInfo); } - Yq::Private::WriteTaskResultResult Record; + Fq::Private::WriteTaskResultResult Record; NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; @@ -403,7 +403,7 @@ struct TEvControlPlaneStorage { TEvGetTaskRequest() = default; explicit TEvGetTaskRequest( - Yq::Private::GetTaskRequest&& request) + Fq::Private::GetTaskRequest&& request) : Request(std::move(request)) {} @@ -412,13 +412,13 @@ struct TEvControlPlaneStorage { + Request.ByteSizeLong(); } - Yq::Private::GetTaskRequest Request; + Fq::Private::GetTaskRequest Request; }; struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, EvGetTaskResponse> { explicit TEvGetTaskResponse( - const Yq::Private::GetTaskResult& record) + const Fq::Private::GetTaskResult& record) : Record(record) {} @@ -434,7 +434,7 @@ struct TEvControlPlaneStorage { + GetDebugInfoByteSize(DebugInfo); } - Yq::Private::GetTaskResult Record; + Fq::Private::GetTaskResult Record; NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; @@ -466,7 +466,7 @@ struct TEvControlPlaneStorage { TEvPingTaskRequest() = default; explicit TEvPingTaskRequest( - Yq::Private::PingTaskRequest&& request) + Fq::Private::PingTaskRequest&& request) : Request(std::move(request)) {} @@ -475,13 +475,13 @@ struct TEvControlPlaneStorage { + Request.ByteSizeLong(); } - Yq::Private::PingTaskRequest Request; + Fq::Private::PingTaskRequest Request; }; struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, EvPingTaskResponse> { explicit TEvPingTaskResponse( - const Yq::Private::PingTaskResult& record) + const Fq::Private::PingTaskResult& record) : Record(record) {} @@ -497,17 +497,17 @@ struct TEvControlPlaneStorage { + GetDebugInfoByteSize(DebugInfo); } - Yq::Private::PingTaskResult Record; + Fq::Private::PingTaskResult Record; NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; struct TEvNodesHealthCheckRequest : NActors::TEventLocal<TEvNodesHealthCheckRequest, EvNodesHealthCheckRequest> { - + TEvNodesHealthCheckRequest() = default; explicit TEvNodesHealthCheckRequest( - Yq::Private::NodesHealthCheckRequest&& request) + Fq::Private::NodesHealthCheckRequest&& request) : Request(std::move(request)) {} @@ -516,13 +516,13 @@ struct TEvControlPlaneStorage { + Request.ByteSizeLong(); } - Yq::Private::NodesHealthCheckRequest Request; + Fq::Private::NodesHealthCheckRequest Request; }; struct TEvNodesHealthCheckResponse : NActors::TEventLocal<TEvNodesHealthCheckResponse, EvNodesHealthCheckResponse> { explicit TEvNodesHealthCheckResponse( - const Yq::Private::NodesHealthCheckResult& record) + const Fq::Private::NodesHealthCheckResult& record) : Record(record) {} @@ -538,7 +538,7 @@ struct TEvControlPlaneStorage { + GetDebugInfoByteSize(DebugInfo); } - Yq::Private::NodesHealthCheckResult Record; + Fq::Private::NodesHealthCheckResult Record; NYql::TIssues Issues; TDebugInfoPtr DebugInfo; }; diff --git a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp index 65d65f7bcd8..85e7fd5f488 100644 --- a/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp +++ b/ydb/core/yq/libs/control_plane_storage/in_memory_control_plane_storage.cpp @@ -298,7 +298,7 @@ private: void Handle(TEvControlPlaneStorage::TEvGetTaskRequest::TPtr& ev) { CPS_LOG_I("GetTaskRequest"); - Yq::Private::GetTaskResult result; + Fq::Private::GetTaskResult result; auto event = std::make_unique<TEvControlPlaneStorage::TEvGetTaskResponse>(result); NActors::TActivationContext::ActorSystem()->Send(new IEventHandle(ev->Sender, SelfId(), event.release(), 0, ev->Cookie)); } @@ -307,7 +307,7 @@ private: { SendEmptyResponse< TEvControlPlaneStorage::TEvPingTaskRequest::TPtr, - Yq::Private::PingTaskResult, + Fq::Private::PingTaskResult, TEvControlPlaneStorage::TEvPingTaskResponse>(ev, "PingTaskRequest"); } @@ -315,7 +315,7 @@ private: { SendEmptyResponse< TEvControlPlaneStorage::TEvNodesHealthCheckRequest::TPtr, - Yq::Private::NodesHealthCheckResult, + Fq::Private::NodesHealthCheckResult, TEvControlPlaneStorage::TEvNodesHealthCheckResponse>(ev, "NodesHealthCheckRequest"); } diff --git a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp index fafade39a1c..9833d5f8cc3 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/nodes_health_check.cpp @@ -38,7 +38,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth return; } - std::shared_ptr<Yq::Private::NodesHealthCheckResult> response = std::make_shared<Yq::Private::NodesHealthCheckResult>(); + std::shared_ptr<Fq::Private::NodesHealthCheckResult> response = std::make_shared<Fq::Private::NodesHealthCheckResult>(); { auto* node = response->add_nodes(); node->set_node_id(nodeId); @@ -109,7 +109,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvNodesHealth auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; TAsyncStatus status = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery.Sql, readQuery.Params, prepareParams, requestCounters, debugInfo); auto prepare = [response] { return *response; }; - auto success = SendResponse<TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Yq::Private::NodesHealthCheckResult>( + auto success = SendResponse<TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Fq::Private::NodesHealthCheckResult>( "NodesHealthCheckRequest - NodesHealthCheckResult", NActors::TActivationContext::ActorSystem(), status, diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index 6fc5800b1fe..18f7295aa33 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -317,8 +317,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ }); auto prepare = [response] { - - Yq::Private::GetTaskResult result; + Fq::Private::GetTaskResult result; const auto& tasks = std::get<0>(*response); for (const auto& task : tasks) { @@ -383,7 +382,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ return result; }; - auto success = SendResponse<TEvControlPlaneStorage::TEvGetTaskResponse, Yq::Private::GetTaskResult> + auto success = SendResponse<TEvControlPlaneStorage::TEvGetTaskResponse, Fq::Private::GetTaskResult> ("GetTaskRequest - GetTaskResult", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 09f4d25bb46..49b1a9681e7 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -18,7 +18,7 @@ bool IsFinishedStatus(YandexQuery::QueryMeta::ComputeStatus status) { } // namespace std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructHardPingTask( - const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response, + const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& automaticQueriesTtl, const TDuration& taskLeaseTtl, const THashMap<ui64, TRetryPolicyItem>& retryPolicies) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "HardPingTask(read)"); @@ -240,7 +240,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } if (!request.created_topic_consumers().empty()) { - std::set<Yq::Private::TopicConsumer, TTopicConsumerLess> mergedConsumers; + std::set<Fq::Private::TopicConsumer, TTopicConsumerLess> mergedConsumers; for (auto&& c : *internal.mutable_created_topic_consumers()) { mergedConsumers.emplace(std::move(c)); } @@ -344,7 +344,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam } std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParams>(const TVector<NYdb::TResultSet>&)>> ConstructSoftPingTask( - const Yq::Private::PingTaskRequest& request, std::shared_ptr<Yq::Private::PingTaskResult> response, + const Fq::Private::PingTaskRequest& request, std::shared_ptr<Fq::Private::PingTaskResult> response, const TString& tablePathPrefix, const TDuration& taskLeaseTtl) { TSqlQueryBuilder readQueryBuilder(tablePathPrefix, "SoftPingTask(read)"); readQueryBuilder.AddString("tenant", request.tenant()); @@ -410,7 +410,7 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskRequest::TPtr& ev) { TInstant startTime = TInstant::Now(); - Yq::Private::PingTaskRequest& request = ev->Get()->Request; + Fq::Private::PingTaskRequest& request = ev->Get()->Request; const TString cloudId = ""; const TString scope = request.scope(); TRequestCountersPtr requestCounters = Counters.GetScopeCounters("" /*CloudId*/, scope, RTS_PING_TASK); @@ -431,7 +431,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq return; } - std::shared_ptr<Yq::Private::PingTaskResult> response = std::make_shared<Yq::Private::PingTaskResult>(); + std::shared_ptr<Fq::Private::PingTaskResult> response = std::make_shared<Fq::Private::PingTaskResult>(); if (request.status()) Counters.GetFinalStatusCounters(cloudId, scope)->IncByStatus(request.status()); @@ -445,7 +445,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvPingTaskReq auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; auto result = ReadModifyWrite(NActors::TActivationContext::ActorSystem(), readQuery, readParams, prepareParams, requestCounters, debugInfo); auto prepare = [response] { return *response; }; - auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Yq::Private::PingTaskResult>( + auto success = SendResponse<TEvControlPlaneStorage::TEvPingTaskResponse, Fq::Private::PingTaskResult>( "PingTaskRequest - PingTaskResult", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp index 1b94ea0d451..7d822f4f471 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_result_write.cpp @@ -29,7 +29,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult return; } - std::shared_ptr<Yq::Private::WriteTaskResultResult> response = std::make_shared<Yq::Private::WriteTaskResultResult>(); + std::shared_ptr<Fq::Private::WriteTaskResultResult> response = std::make_shared<Fq::Private::WriteTaskResultResult>(); response->set_request_id(request.request_id()); NYdb::TValueBuilder itemsAsList; @@ -68,7 +68,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvWriteResult auto debugInfo = Config.Proto.GetEnableDebugMode() ? std::make_shared<TDebugInfo>() : TDebugInfoPtr{}; TAsyncStatus result = Write(NActors::TActivationContext::ActorSystem(), query.Sql, query.Params, requestCounters, debugInfo); auto prepare = [response] { return *response; }; - auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Yq::Private::WriteTaskResultResult>( + auto success = SendResponse<TEvControlPlaneStorage::TEvWriteResultDataResponse, Fq::Private::WriteTaskResultResult>( "WriteResultDataRequest - WriteResultDataResult", NActors::TActivationContext::ActorSystem(), result, diff --git a/ydb/core/yq/libs/control_plane_storage/internal/utils.h b/ydb/core/yq/libs/control_plane_storage/internal/utils.h index 2fd0f708d13..6318afb678a 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/utils.h +++ b/ydb/core/yq/libs/control_plane_storage/internal/utils.h @@ -12,7 +12,7 @@ namespace NYq { struct TTopicConsumerLess { - bool operator()(const Yq::Private::TopicConsumer& c1, const Yq::Private::TopicConsumer& c2) const { + bool operator()(const Fq::Private::TopicConsumer& c1, const Fq::Private::TopicConsumer& c2) const { // Cluster endpoint/use ssl are not in key return std::tie(c1.database_id(), c1.database(), c1.topic_path(), c1.consumer_name()) < std::tie(c2.database_id(), c2.database(), c2.topic_path(), c2.consumer_name()); } diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders.h b/ydb/core/yq/libs/control_plane_storage/message_builders.h index 09cfddad7f3..9b92c7c14f5 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders.h @@ -1174,7 +1174,7 @@ public: std::unique_ptr<NYq::TEvControlPlaneStorage::TEvPingTaskRequest> Build() { - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_owner_id(Owner); request.mutable_query_id()->set_value(QueryId); request.mutable_result_id()->set_value(ResultId); @@ -1291,7 +1291,7 @@ public: std::unique_ptr<NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build() { - Yq::Private::NodesHealthCheckRequest request; + Fq::Private::NodesHealthCheckRequest request; request.set_tenant(TenantName); auto& node = *request.mutable_node(); node.set_node_id(NodeId); diff --git a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h index e0fc67201e1..754c432fe79 100644 --- a/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h +++ b/ydb/core/yq/libs/control_plane_storage/message_builders_yq.h @@ -1174,7 +1174,7 @@ public: std::unique_ptr<TEvControlPlaneStorage::TEvPingTaskRequest> Build() { - Yq::Private::PingTaskRequest request; + Fq::Private::PingTaskRequest request; request.set_owner_id(Owner); request.mutable_query_id()->set_value(QueryId); request.mutable_result_id()->set_value(ResultId); @@ -1289,7 +1289,7 @@ public: std::unique_ptr<TEvControlPlaneStorage::TEvNodesHealthCheckRequest> Build() { - Yq::Private::NodesHealthCheckRequest request; + Fq::Private::NodesHealthCheckRequest request; request.set_tenant(TenantName); auto& node = *request.mutable_node(); node.set_node_id(NodeId); diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto index 0d17e1fc9a2..a451d761f99 100644 --- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto @@ -8,7 +8,7 @@ option java_outer_classname = "YandexQueryInternalProtos"; import "ydb/library/yql/providers/dq/api/protos/service.proto"; import "ydb/library/yql/dq/proto/dq_tasks.proto"; import "ydb/public/api/protos/yq.proto"; -import "ydb/core/yq/libs/protos/yq_private.proto"; +import "ydb/core/yq/libs/protos/fq_private.proto"; import "google/protobuf/duration.proto"; @@ -28,21 +28,21 @@ message QueryInternal { ExecuteMode execute_mode = 9; StateLoadMode state_load_mode = 10; string cloud_id = 11; - repeated Yq.Private.TopicConsumer created_topic_consumers = 12; + repeated Fq.Private.TopicConsumer created_topic_consumers = 12; repeated bytes dq_graph = 13; // deprecated: use dq_graph_compressed int32 dq_graph_index = 14; StreamingDisposition disposition = 15; uint64 result_limit = 16; google.protobuf.Duration execution_ttl = 17; - Yq.Private.CompressedData ast_compressed = 18; - Yq.Private.CompressedData plan_compressed = 19; - repeated Yq.Private.CompressedData dq_graph_compressed = 20; + Fq.Private.CompressedData ast_compressed = 18; + Fq.Private.CompressedData plan_compressed = 19; + repeated Fq.Private.CompressedData dq_graph_compressed = 20; } message JobInternal { - Yq.Private.CompressedData ast_compressed = 1; - Yq.Private.CompressedData plan_compressed = 2; - repeated Yq.Private.CompressedData dq_graph_compressed = 3; + Fq.Private.CompressedData ast_compressed = 1; + Fq.Private.CompressedData plan_compressed = 2; + repeated Fq.Private.CompressedData dq_graph_compressed = 3; } message ConnectionInternal { diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index f73e0e4ce94..899bbfc7a3b 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -140,7 +140,7 @@ NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlane return config; } -bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request) { +bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request) { return request.status() != YandexQuery::QueryMeta::COMPUTE_STATUS_UNSPECIFIED || !request.issues().empty() || !request.transient_issues().empty() diff --git a/ydb/core/yq/libs/control_plane_storage/util.h b/ydb/core/yq/libs/control_plane_storage/util.h index d14f27bcc87..39226881d25 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.h +++ b/ydb/core/yq/libs/control_plane_storage/util.h @@ -38,7 +38,7 @@ TDuration GetDuration(const TString& value, const TDuration& defaultValue); NConfig::TControlPlaneStorageConfig FillDefaultParameters(NConfig::TControlPlaneStorageConfig config); -bool DoesPingTaskUpdateQueriesTable(const Yq::Private::PingTaskRequest& request); +bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request); NYdb::TValue PackItemsToList(const TVector<NYdb::TValue>& items); diff --git a/ydb/core/yq/libs/events/events.h b/ydb/core/yq/libs/events/events.h index 4179ea23049..72c4fd73ed6 100644 --- a/ydb/core/yq/libs/events/events.h +++ b/ydb/core/yq/libs/events/events.h @@ -7,7 +7,7 @@ #include <ydb/library/yql/public/issue/yql_issue.h> #include <ydb/core/yq/libs/graph_params/proto/graph_params.pb.h> -#include <ydb/core/yq/libs/protos/yq_private.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/lib/yq/scope.h> @@ -34,47 +34,47 @@ struct TEvents { }; struct TEvPingTaskRequest : NActors::TEventLocal<TEvPingTaskRequest, TEventIds::EvPingTaskRequest>, TEvAnalyticsBase { - Yq::Private::PingTaskRequest Record; + Fq::Private::PingTaskRequest Record; TString User; }; struct TEvPingTaskResponse : NActors::TEventLocal<TEvPingTaskResponse, TEventIds::EvPingTaskResponse> { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; - TMaybe<Yq::Private::PingTaskResult> Record; + TMaybe<Fq::Private::PingTaskResult> Record; }; struct TEvGetTaskRequest : NActors::TEventLocal<TEvGetTaskRequest, TEventIds::EvGetTaskRequest>, TEvAnalyticsBase { - Yq::Private::GetTaskRequest Record; + Fq::Private::GetTaskRequest Record; TString User; }; struct TEvGetTaskResponse : NActors::TEventLocal<TEvGetTaskResponse, TEventIds::EvGetTaskResponse> { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; - TMaybe<Yq::Private::GetTaskResult> Record; + TMaybe<Fq::Private::GetTaskResult> Record; }; struct TEvWriteTaskResultRequest : NActors::TEventLocal<TEvWriteTaskResultRequest, TEventIds::EvWriteTaskResultRequest>, TEvAnalyticsBase { - Yq::Private::WriteTaskResultRequest Record; + Fq::Private::WriteTaskResultRequest Record; TString User; }; struct TEvWriteTaskResultResponse : NActors::TEventLocal<TEvWriteTaskResultResponse, TEventIds::EvWriteTaskResultResponse> { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; - TMaybe<Yq::Private::WriteTaskResultResult> Record; + TMaybe<Fq::Private::WriteTaskResultResult> Record; }; struct TEvNodesHealthCheckRequest : NActors::TEventLocal<TEvNodesHealthCheckRequest, TEventIds::EvNodesHealthCheckRequest>{ - Yq::Private::NodesHealthCheckRequest Record; + Fq::Private::NodesHealthCheckRequest Record; TString User; }; struct TEvNodesHealthCheckResponse : NActors::TEventLocal<TEvNodesHealthCheckResponse, TEventIds::EvNodesHealthCheckResponse> { Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; - TMaybe<Yq::Private::NodesHealthCheckResult> Record; + TMaybe<Fq::Private::NodesHealthCheckResult> Record; }; struct TEvAsyncContinue : NActors::TEventLocal<TEvAsyncContinue, TEventIds::EvAsyncContinue> { @@ -180,12 +180,12 @@ struct TEvents { }; struct TEvForwardPingRequest : NActors::TEventLocal<TEvForwardPingRequest, TEventIds::EvForwardPingRequest> { - explicit TEvForwardPingRequest(const Yq::Private::PingTaskRequest& request, bool final = false) + explicit TEvForwardPingRequest(const Fq::Private::PingTaskRequest& request, bool final = false) : Request(request) , Final(final) { } - Yq::Private::PingTaskRequest Request; + Fq::Private::PingTaskRequest Request; bool Final; // Is this the last ping request. }; diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index d3e57645317..f10fd950345 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -192,14 +192,14 @@ void Init( if (protoConfig.GetNodesManager().GetEnabled() || protoConfig.GetPendingFetcher().GetEnabled()) { auto internal = protoConfig.GetPrivateApi().GetLoopback() - ? CreateLoopbackServiceActor(clientCounters) - : CreateInternalServiceActor( + ? NFq::CreateLoopbackServiceActor(clientCounters) + : NFq::CreateInternalServiceActor( yqSharedResources, credentialsProviderFactory, protoConfig.GetPrivateApi(), clientCounters ); - actorRegistrator(MakeInternalServiceActorId(), internal); + actorRegistrator(NFq::MakeInternalServiceActorId(), internal); } if (protoConfig.GetNodesManager().GetEnabled()) { diff --git a/ydb/core/yq/libs/private_client/CMakeLists.txt b/ydb/core/yq/libs/private_client/CMakeLists.txt index 64e19f535e1..0f5f23c5b4d 100644 --- a/ydb/core/yq/libs/private_client/CMakeLists.txt +++ b/ydb/core/yq/libs/private_client/CMakeLists.txt @@ -25,6 +25,5 @@ target_link_libraries(yq-libs-private_client PUBLIC target_sources(yq-libs-private_client PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/internal_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/loopback_service.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client_fq.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client.cpp ) diff --git a/ydb/core/yq/libs/private_client/events.h b/ydb/core/yq/libs/private_client/events.h index 073a097435a..e6cdab10c67 100644 --- a/ydb/core/yq/libs/private_client/events.h +++ b/ydb/core/yq/libs/private_client/events.h @@ -13,7 +13,7 @@ #include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h> -namespace NYq { +namespace NFq { struct TEvInternalService { // Event ids. @@ -35,26 +35,26 @@ struct TEvInternalService { struct TInternalServiceRequestEvent : public NActors::TEventLocal<TInternalServiceRequestEvent<TProtoRequest, TEventType>, TEventType> { TProtoRequest Request; TInstant SentAt; - explicit TInternalServiceRequestEvent(const TProtoRequest& request) + explicit TInternalServiceRequestEvent(const TProtoRequest& request) : Request(request), SentAt(Now()) { } }; - using TEvHealthCheckRequest = TInternalServiceRequestEvent<Yq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>; - using TEvGetTaskRequest = TInternalServiceRequestEvent<Yq::Private::GetTaskRequest, EvGetTaskRequest>; - using TEvPingTaskRequest = TInternalServiceRequestEvent<Yq::Private::PingTaskRequest, EvPingTaskRequest>; - using TEvWriteResultRequest = TInternalServiceRequestEvent<Yq::Private::WriteTaskResultRequest, EvWriteResultRequest>; + using TEvHealthCheckRequest = TInternalServiceRequestEvent<Fq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>; + using TEvGetTaskRequest = TInternalServiceRequestEvent<Fq::Private::GetTaskRequest, EvGetTaskRequest>; + using TEvPingTaskRequest = TInternalServiceRequestEvent<Fq::Private::PingTaskRequest, EvPingTaskRequest>; + using TEvWriteResultRequest = TInternalServiceRequestEvent<Fq::Private::WriteTaskResultRequest, EvWriteResultRequest>; template <class TProtoResult, ui32 TEventType> struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> { NYdb::TStatus Status; TProtoResult Result; - explicit TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) { + explicit TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) { if (wrappedResult.IsResultSet()) { Result = wrappedResult.GetResult(); } } - explicit TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) { + explicit TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) { } explicit TInternalServiceResponseEvent(const TProtoResult& result) : Status(NYdb::EStatus::SUCCESS, NYql::TIssues()), Result(result) { } @@ -62,12 +62,12 @@ struct TEvInternalService { } }; - using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>; - using TEvGetTaskResponse = TInternalServiceResponseEvent<Yq::Private::GetTaskResult, EvGetTaskResponse>; - using TEvPingTaskResponse = TInternalServiceResponseEvent<Yq::Private::PingTaskResult, EvPingTaskResponse>; - using TEvWriteResultResponse = TInternalServiceResponseEvent<Yq::Private::WriteTaskResultResult, EvWriteResultResponse>; + using TEvHealthCheckResponse = TInternalServiceResponseEvent<Fq::Private::NodesHealthCheckResult, EvHealthCheckResponse>; + using TEvGetTaskResponse = TInternalServiceResponseEvent<Fq::Private::GetTaskResult, EvGetTaskResponse>; + using TEvPingTaskResponse = TInternalServiceResponseEvent<Fq::Private::PingTaskResult, EvPingTaskResponse>; + using TEvWriteResultResponse = TInternalServiceResponseEvent<Fq::Private::WriteTaskResultResult, EvWriteResultResponse>; }; NActors::TActorId MakeInternalServiceActorId(); -} /* NYq */ +} /* NFq */ diff --git a/ydb/core/yq/libs/private_client/internal_service.cpp b/ydb/core/yq/libs/private_client/internal_service.cpp index ba013d5d86e..bb214525114 100644 --- a/ydb/core/yq/libs/private_client/internal_service.cpp +++ b/ydb/core/yq/libs/private_client/internal_service.cpp @@ -16,7 +16,7 @@ #define LOG_D(stream) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) -namespace NYq { +namespace NFq { NActors::TActorId MakeInternalServiceActorId() { constexpr TStringBuf name = "FQINTSRV"; @@ -123,4 +123,4 @@ NActors::IActor* CreateInternalServiceActor( return new TInternalService(yqSharedResources, credentialsProviderFactory, privateApiConfig, counters); } -} /* NYq */ +} /* NFq */ diff --git a/ydb/core/yq/libs/private_client/internal_service.h b/ydb/core/yq/libs/private_client/internal_service.h index 145838e95c2..2bad41f7cbc 100644 --- a/ydb/core/yq/libs/private_client/internal_service.h +++ b/ydb/core/yq/libs/private_client/internal_service.h @@ -14,7 +14,7 @@ #include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h> -namespace NYq { +namespace NFq { NActors::IActor* CreateInternalServiceActor( const NYq::TYqSharedResources::TPtr& yqSharedResources, @@ -22,4 +22,4 @@ NActors::IActor* CreateInternalServiceActor( const NYq::NConfig::TPrivateApiConfig& privateApiConfig, const ::NMonitoring::TDynamicCounterPtr& counters); -} /* NYq */ +} /* NFq */ diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp index 8a87d2cc405..9357e99034e 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.cpp +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -18,7 +18,7 @@ #define LOG_D(stream) \ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) -namespace NYq { +namespace NFq { class TLoopbackService : public NActors::TActorBootstrapped<TLoopbackService> { public: @@ -144,4 +144,4 @@ NActors::IActor* CreateLoopbackServiceActor( return new TLoopbackService(counters); } -} /* NYq */ +} /* NFq */ diff --git a/ydb/core/yq/libs/private_client/loopback_service.h b/ydb/core/yq/libs/private_client/loopback_service.h index 8a3819df7a1..e722d0b5274 100644 --- a/ydb/core/yq/libs/private_client/loopback_service.h +++ b/ydb/core/yq/libs/private_client/loopback_service.h @@ -4,9 +4,9 @@ #include <library/cpp/monlib/dynamic_counters/counters.h> -namespace NYq { +namespace NFq { NActors::IActor* CreateLoopbackServiceActor( const ::NMonitoring::TDynamicCounterPtr& counters); -} /* NYq */ +} /* NFq */ diff --git a/ydb/core/yq/libs/private_client/private_client.cpp b/ydb/core/yq/libs/private_client/private_client.cpp index d9fb1562ee8..269ea674a1b 100644 --- a/ydb/core/yq/libs/private_client/private_client.cpp +++ b/ydb/core/yq/libs/private_client/private_client.cpp @@ -3,7 +3,7 @@ #include <ydb/library/yql/public/issue/yql_issue_message.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> -namespace NYq { +namespace NFq { using namespace NYdb; @@ -12,7 +12,7 @@ public: TImpl( std::shared_ptr<TGRpcConnectionsImpl>&& connections, const TCommonClientSettings& settings, - const ::NMonitoring::TDynamicCounterPtr& counters) + const NMonitoring::TDynamicCounterPtr& counters) : TClientImplCommon(std::move(connections), settings) , Counters(counters->GetSubgroup("subsystem", "private_api")->GetSubgroup("subcomponent", "ClientMetrics")) , PingTaskTime(Counters->GetHistogram("PingTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50))) @@ -41,23 +41,23 @@ public: } TAsyncPingTaskResult PingTask( - Yq::Private::PingTaskRequest&& request, + Fq::Private::PingTaskRequest&& request, const TPingTaskSettings& settings) { const auto startedAt = TInstant::Now(); auto promise = NThreading::NewPromise<TPingTaskResult>(); auto future = promise.GetFuture(); auto extractor = MakeResultExtractor< - Yq::Private::PingTaskResult, + Fq::Private::PingTaskResult, TPingTaskResult>(std::move(promise), PingTaskTime, startedAt); Connections_->RunDeferred< - Yq::Private::V1::YqPrivateTaskService, - Yq::Private::PingTaskRequest, - Yq::Private::PingTaskResponse>( + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::PingTaskRequest, + Fq::Private::PingTaskResponse>( std::move(request), std::move(extractor), - &Yq::Private::V1::YqPrivateTaskService::Stub::AsyncPingTask, + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncPingTask, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, TRpcRequestSettings::Make(settings), @@ -67,22 +67,22 @@ public: } TAsyncGetTaskResult GetTask( - Yq::Private::GetTaskRequest&& request, + Fq::Private::GetTaskRequest&& request, const TGetTaskSettings& settings) { const auto startedAt = TInstant::Now(); auto promise = NThreading::NewPromise<TGetTaskResult>(); auto future = promise.GetFuture(); auto extractor = MakeResultExtractor< - Yq::Private::GetTaskResult, + Fq::Private::GetTaskResult, TGetTaskResult>(std::move(promise), GetTaskTime, startedAt); Connections_->RunDeferred< - Yq::Private::V1::YqPrivateTaskService, - Yq::Private::GetTaskRequest, - Yq::Private::GetTaskResponse>( + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::GetTaskRequest, + Fq::Private::GetTaskResponse>( std::move(request), std::move(extractor), - &Yq::Private::V1::YqPrivateTaskService::Stub::AsyncGetTask, + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncGetTask, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, TRpcRequestSettings::Make(settings), @@ -92,22 +92,22 @@ public: } TAsyncWriteTaskResult WriteTaskResult( - Yq::Private::WriteTaskResultRequest&& request, + Fq::Private::WriteTaskResultRequest&& request, const TWriteTaskResultSettings& settings) { const auto startedAt = TInstant::Now(); auto promise = NThreading::NewPromise<TWriteTaskResult>(); auto future = promise.GetFuture(); auto extractor = MakeResultExtractor< - Yq::Private::WriteTaskResultResult, + Fq::Private::WriteTaskResultResult, TWriteTaskResult>(std::move(promise), WriteTaskResultTime, startedAt); Connections_->RunDeferred< - Yq::Private::V1::YqPrivateTaskService, - Yq::Private::WriteTaskResultRequest, - Yq::Private::WriteTaskResultResponse>( + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::WriteTaskResultRequest, + Fq::Private::WriteTaskResultResponse>( std::move(request), std::move(extractor), - &Yq::Private::V1::YqPrivateTaskService::Stub::AsyncWriteTaskResult, + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncWriteTaskResult, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, TRpcRequestSettings::Make(settings), @@ -117,22 +117,22 @@ public: } TAsyncNodesHealthCheckResult NodesHealthCheck( - Yq::Private::NodesHealthCheckRequest&& request, + Fq::Private::NodesHealthCheckRequest&& request, const TNodesHealthCheckSettings& settings) { const auto startedAt = TInstant::Now(); auto promise = NThreading::NewPromise<TNodesHealthCheckResult>(); auto future = promise.GetFuture(); auto extractor = MakeResultExtractor< - Yq::Private::NodesHealthCheckResult, + Fq::Private::NodesHealthCheckResult, TNodesHealthCheckResult>(std::move(promise), NodesHealthCheckTime, startedAt); Connections_->RunDeferred< - Yq::Private::V1::YqPrivateTaskService, - Yq::Private::NodesHealthCheckRequest, - Yq::Private::NodesHealthCheckResponse>( + Fq::Private::V1::FqPrivateTaskService, + Fq::Private::NodesHealthCheckRequest, + Fq::Private::NodesHealthCheckResponse>( std::move(request), std::move(extractor), - &Yq::Private::V1::YqPrivateTaskService::Stub::AsyncNodesHealthCheck, + &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncNodesHealthCheck, DbDriverState_, INITIAL_DEFERRED_CALL_DELAY, TRpcRequestSettings::Make(settings), @@ -141,7 +141,7 @@ public: return future; } private: - const ::NMonitoring::TDynamicCounterPtr Counters; + const NMonitoring::TDynamicCounterPtr Counters; const NMonitoring::THistogramPtr PingTaskTime; const NMonitoring::THistogramPtr GetTaskTime; const NMonitoring::THistogramPtr WriteTaskResultTime; @@ -151,33 +151,32 @@ private: TPrivateClient::TPrivateClient( const TDriver& driver, const TCommonClientSettings& settings, - const ::NMonitoring::TDynamicCounterPtr& counters) + const NMonitoring::TDynamicCounterPtr& counters) : Impl(new TImpl(CreateInternalInterface(driver), settings, counters)) {} TAsyncPingTaskResult TPrivateClient::PingTask( - Yq::Private::PingTaskRequest&& request, + Fq::Private::PingTaskRequest&& request, const TPingTaskSettings& settings) { return Impl->PingTask(std::move(request), settings); } TAsyncGetTaskResult TPrivateClient::GetTask( - Yq::Private::GetTaskRequest&& request, + Fq::Private::GetTaskRequest&& request, const TGetTaskSettings& settings) { return Impl->GetTask(std::move(request), settings); } TAsyncWriteTaskResult TPrivateClient::WriteTaskResult( - Yq::Private::WriteTaskResultRequest&& request, + Fq::Private::WriteTaskResultRequest&& request, const TWriteTaskResultSettings& settings) { return Impl->WriteTaskResult(std::move(request), settings); } TAsyncNodesHealthCheckResult TPrivateClient::NodesHealthCheck( - Yq::Private::NodesHealthCheckRequest&& request, + Fq::Private::NodesHealthCheckRequest&& request, const TNodesHealthCheckSettings& settings) { return Impl->NodesHealthCheck(std::move(request), settings); } -} //NYq - +} //NFq diff --git a/ydb/core/yq/libs/private_client/private_client.h b/ydb/core/yq/libs/private_client/private_client.h index b932e8410a4..d697f417522 100644 --- a/ydb/core/yq/libs/private_client/private_client.h +++ b/ydb/core/yq/libs/private_client/private_client.h @@ -4,8 +4,9 @@ #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/api/grpc/draft/yql_db_v1.grpc.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> -namespace NYq { +namespace NFq { template<class TProtoResult> class TProtoResultInternalWrapper : public NYdb::TStatus { @@ -34,10 +35,10 @@ private: }; -using TGetTaskResult = TProtoResultInternalWrapper<Yq::Private::GetTaskResult>; -using TPingTaskResult = TProtoResultInternalWrapper<Yq::Private::PingTaskResult>; -using TWriteTaskResult = TProtoResultInternalWrapper<Yq::Private::WriteTaskResultResult>; -using TNodesHealthCheckResult = TProtoResultInternalWrapper<Yq::Private::NodesHealthCheckResult>; +using TGetTaskResult = TProtoResultInternalWrapper<Fq::Private::GetTaskResult>; +using TPingTaskResult = TProtoResultInternalWrapper<Fq::Private::PingTaskResult>; +using TWriteTaskResult = TProtoResultInternalWrapper<Fq::Private::WriteTaskResultResult>; +using TNodesHealthCheckResult = TProtoResultInternalWrapper<Fq::Private::NodesHealthCheckResult>; using TAsyncGetTaskResult = NThreading::TFuture<TGetTaskResult>; using TAsyncPingTaskResult = NThreading::TFuture<TPingTaskResult>; @@ -56,26 +57,26 @@ public: TPrivateClient( const NYdb::TDriver& driver, const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings(), - const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>()); + const NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); TAsyncGetTaskResult GetTask( - Yq::Private::GetTaskRequest&& request, + Fq::Private::GetTaskRequest&& request, const TGetTaskSettings& settings = TGetTaskSettings()); TAsyncPingTaskResult PingTask( - Yq::Private::PingTaskRequest&& request, + Fq::Private::PingTaskRequest&& request, const TPingTaskSettings& settings = TPingTaskSettings()); TAsyncWriteTaskResult WriteTaskResult( - Yq::Private::WriteTaskResultRequest&& request, + Fq::Private::WriteTaskResultRequest&& request, const TWriteTaskResultSettings& settings = TWriteTaskResultSettings()); TAsyncNodesHealthCheckResult NodesHealthCheck( - Yq::Private::NodesHealthCheckRequest&& request, + Fq::Private::NodesHealthCheckRequest&& request, const TNodesHealthCheckSettings& settings = TNodesHealthCheckSettings()); private: std::shared_ptr<TImpl> Impl; }; -} // namespace NYq +} // namespace NFq diff --git a/ydb/core/yq/libs/private_client/private_client_fq.cpp b/ydb/core/yq/libs/private_client/private_client_fq.cpp deleted file mode 100644 index f3d6992460e..00000000000 --- a/ydb/core/yq/libs/private_client/private_client_fq.cpp +++ /dev/null @@ -1,182 +0,0 @@ -#include "private_client_fq.h" -#include <ydb/library/yql/public/issue/yql_issue.h> -#include <ydb/library/yql/public/issue/yql_issue_message.h> -#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> - -namespace NFq { - -using namespace NYdb; - -class TPrivateClient::TImpl : public TClientImplCommon<TPrivateClient::TImpl> { -public: - TImpl( - std::shared_ptr<TGRpcConnectionsImpl>&& connections, - const TCommonClientSettings& settings, - const NMonitoring::TDynamicCounterPtr& counters) - : TClientImplCommon(std::move(connections), settings) - , Counters(counters->GetSubgroup("subsystem", "private_api")->GetSubgroup("subcomponent", "ClientMetrics")) - , PingTaskTime(Counters->GetHistogram("PingTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50))) - , GetTaskTime(Counters->GetHistogram("GetTaskMs", NMonitoring::ExponentialHistogram(10, 2, 50))) - , WriteTaskResultTime(Counters->GetHistogram("WriteTaskResultMs", NMonitoring::ExponentialHistogram(10, 2, 50))) - , NodesHealthCheckTime(Counters->GetHistogram("NodesHealthCheckMs", NMonitoring::ExponentialHistogram(10, 2, 50))) - {} - - template<class TProtoResult, class TResultWrapper> - auto MakeResultExtractor(NThreading::TPromise<TResultWrapper> promise, const NMonitoring::THistogramPtr& hist, TInstant startedAt) { - return [=, promise = std::move(promise)] - (google::protobuf::Any* any, TPlainStatus status) mutable { - std::unique_ptr<TProtoResult> result; - if (any) { - result.reset(new TProtoResult); - any->UnpackTo(result.get()); - } - - hist->Collect((TInstant::Now() - startedAt).MilliSeconds()); - - promise.SetValue( - TResultWrapper( - TStatus(std::move(status)), - std::move(result))); - }; - } - - TAsyncPingTaskResult PingTask( - Fq::Private::PingTaskRequest&& request, - const TPingTaskSettings& settings) { - const auto startedAt = TInstant::Now(); - auto promise = NThreading::NewPromise<TPingTaskResult>(); - auto future = promise.GetFuture(); - - auto extractor = MakeResultExtractor< - Fq::Private::PingTaskResult, - TPingTaskResult>(std::move(promise), PingTaskTime, startedAt); - - Connections_->RunDeferred< - Fq::Private::V1::FqPrivateTaskService, - Fq::Private::PingTaskRequest, - Fq::Private::PingTaskResponse>( - std::move(request), - std::move(extractor), - &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncPingTask, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - settings.ClientTimeout_); - - return future; - } - - TAsyncGetTaskResult GetTask( - Fq::Private::GetTaskRequest&& request, - const TGetTaskSettings& settings) { - const auto startedAt = TInstant::Now(); - auto promise = NThreading::NewPromise<TGetTaskResult>(); - auto future = promise.GetFuture(); - auto extractor = MakeResultExtractor< - Fq::Private::GetTaskResult, - TGetTaskResult>(std::move(promise), GetTaskTime, startedAt); - - Connections_->RunDeferred< - Fq::Private::V1::FqPrivateTaskService, - Fq::Private::GetTaskRequest, - Fq::Private::GetTaskResponse>( - std::move(request), - std::move(extractor), - &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncGetTask, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - settings.ClientTimeout_); - - return future; - } - - TAsyncWriteTaskResult WriteTaskResult( - Fq::Private::WriteTaskResultRequest&& request, - const TWriteTaskResultSettings& settings) { - const auto startedAt = TInstant::Now(); - auto promise = NThreading::NewPromise<TWriteTaskResult>(); - auto future = promise.GetFuture(); - auto extractor = MakeResultExtractor< - Fq::Private::WriteTaskResultResult, - TWriteTaskResult>(std::move(promise), WriteTaskResultTime, startedAt); - - Connections_->RunDeferred< - Fq::Private::V1::FqPrivateTaskService, - Fq::Private::WriteTaskResultRequest, - Fq::Private::WriteTaskResultResponse>( - std::move(request), - std::move(extractor), - &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncWriteTaskResult, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - settings.ClientTimeout_); - - return future; - } - - TAsyncNodesHealthCheckResult NodesHealthCheck( - Fq::Private::NodesHealthCheckRequest&& request, - const TNodesHealthCheckSettings& settings) { - const auto startedAt = TInstant::Now(); - auto promise = NThreading::NewPromise<TNodesHealthCheckResult>(); - auto future = promise.GetFuture(); - auto extractor = MakeResultExtractor< - Fq::Private::NodesHealthCheckResult, - TNodesHealthCheckResult>(std::move(promise), NodesHealthCheckTime, startedAt); - - Connections_->RunDeferred< - Fq::Private::V1::FqPrivateTaskService, - Fq::Private::NodesHealthCheckRequest, - Fq::Private::NodesHealthCheckResponse>( - std::move(request), - std::move(extractor), - &Fq::Private::V1::FqPrivateTaskService::Stub::AsyncNodesHealthCheck, - DbDriverState_, - INITIAL_DEFERRED_CALL_DELAY, - TRpcRequestSettings::Make(settings), - settings.ClientTimeout_); - - return future; - } -private: - const NMonitoring::TDynamicCounterPtr Counters; - const NMonitoring::THistogramPtr PingTaskTime; - const NMonitoring::THistogramPtr GetTaskTime; - const NMonitoring::THistogramPtr WriteTaskResultTime; - const NMonitoring::THistogramPtr NodesHealthCheckTime; -}; - -TPrivateClient::TPrivateClient( - const TDriver& driver, - const TCommonClientSettings& settings, - const NMonitoring::TDynamicCounterPtr& counters) - : Impl(new TImpl(CreateInternalInterface(driver), settings, counters)) -{} - -TAsyncPingTaskResult TPrivateClient::PingTask( - Fq::Private::PingTaskRequest&& request, - const TPingTaskSettings& settings) { - return Impl->PingTask(std::move(request), settings); -} - -TAsyncGetTaskResult TPrivateClient::GetTask( - Fq::Private::GetTaskRequest&& request, - const TGetTaskSettings& settings) { - return Impl->GetTask(std::move(request), settings); -} - -TAsyncWriteTaskResult TPrivateClient::WriteTaskResult( - Fq::Private::WriteTaskResultRequest&& request, - const TWriteTaskResultSettings& settings) { - return Impl->WriteTaskResult(std::move(request), settings); -} - -TAsyncNodesHealthCheckResult TPrivateClient::NodesHealthCheck( - Fq::Private::NodesHealthCheckRequest&& request, - const TNodesHealthCheckSettings& settings) { - return Impl->NodesHealthCheck(std::move(request), settings); -} - -} //NFq diff --git a/ydb/core/yq/libs/private_client/private_client_fq.h b/ydb/core/yq/libs/private_client/private_client_fq.h deleted file mode 100644 index 2333821cf35..00000000000 --- a/ydb/core/yq/libs/private_client/private_client_fq.h +++ /dev/null @@ -1,82 +0,0 @@ -#pragma once - -#include <library/cpp/monlib/dynamic_counters/counters.h> - -#include <ydb/public/sdk/cpp/client/ydb_table/table.h> -#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h> -#include <ydb/core/yq/libs/protos/fq_private.pb.h> - -namespace NFq { - -template<class TProtoResult> -class TProtoResultInternalWrapper : public NYdb::TStatus { - friend class TPrivateClient; - -public: - TProtoResultInternalWrapper( - NYdb::TStatus&& status, - std::unique_ptr<TProtoResult> result) - : TStatus(std::move(status)) - , Result(std::move(result)) - { } - -public: - const TProtoResult& GetResult() const { - Y_VERIFY(Result, "Uninitialized result"); - return *Result; - } - - bool IsResultSet() const { - return Result ? true : false; - } - -private: - std::unique_ptr<TProtoResult> Result; -}; - - -using TGetTaskResult = TProtoResultInternalWrapper<Fq::Private::GetTaskResult>; -using TPingTaskResult = TProtoResultInternalWrapper<Fq::Private::PingTaskResult>; -using TWriteTaskResult = TProtoResultInternalWrapper<Fq::Private::WriteTaskResultResult>; -using TNodesHealthCheckResult = TProtoResultInternalWrapper<Fq::Private::NodesHealthCheckResult>; - -using TAsyncGetTaskResult = NThreading::TFuture<TGetTaskResult>; -using TAsyncPingTaskResult = NThreading::TFuture<TPingTaskResult>; -using TAsyncWriteTaskResult = NThreading::TFuture<TWriteTaskResult>; -using TAsyncNodesHealthCheckResult = NThreading::TFuture<TNodesHealthCheckResult>; - -struct TGetTaskSettings : public NYdb::TOperationRequestSettings<TGetTaskSettings> {}; -struct TPingTaskSettings : public NYdb::TOperationRequestSettings<TPingTaskSettings> {}; -struct TWriteTaskResultSettings : public NYdb::TOperationRequestSettings<TWriteTaskResultSettings> {}; -struct TNodesHealthCheckSettings : public NYdb::TOperationRequestSettings<TNodesHealthCheckSettings> {}; - -class TPrivateClient { - class TImpl; - -public: - TPrivateClient( - const NYdb::TDriver& driver, - const NYdb::TCommonClientSettings& settings = NYdb::TCommonClientSettings(), - const NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<NMonitoring::TDynamicCounters>()); - - TAsyncGetTaskResult GetTask( - Fq::Private::GetTaskRequest&& request, - const TGetTaskSettings& settings = TGetTaskSettings()); - - TAsyncPingTaskResult PingTask( - Fq::Private::PingTaskRequest&& request, - const TPingTaskSettings& settings = TPingTaskSettings()); - - TAsyncWriteTaskResult WriteTaskResult( - Fq::Private::WriteTaskResultRequest&& request, - const TWriteTaskResultSettings& settings = TWriteTaskResultSettings()); - - TAsyncNodesHealthCheckResult NodesHealthCheck( - Fq::Private::NodesHealthCheckRequest&& request, - const TNodesHealthCheckSettings& settings = TNodesHealthCheckSettings()); - -private: - std::shared_ptr<TImpl> Impl; -}; - -} // namespace NFq diff --git a/ydb/core/yq/libs/protos/CMakeLists.txt b/ydb/core/yq/libs/protos/CMakeLists.txt index 2d6538dedff..7ad41618c76 100644 --- a/ydb/core/yq/libs/protos/CMakeLists.txt +++ b/ydb/core/yq/libs/protos/CMakeLists.txt @@ -17,7 +17,6 @@ target_link_libraries(yq-libs-protos PUBLIC ) target_proto_messages(yq-libs-protos PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/fq_private.proto - ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/protos/yq_private.proto ) target_proto_addincls(yq-libs-protos ./ diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto index 95f486ae834..9ab2332b97d 100644 --- a/ydb/core/yq/libs/protos/fq_private.proto +++ b/ydb/core/yq/libs/protos/fq_private.proto @@ -11,11 +11,18 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/public/api/protos/annotations/validation.proto"; import "ydb/public/api/protos/fq.proto"; +import "ydb/public/api/protos/yq.proto"; +import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; //////////////////////////////////////////////////////////// +message CompressedData { + string method = 1; + bytes data = 2; +} + message GetTaskRequest { string tenant = 1; string owner_id = 2; // guid, should be refreshed on node restart @@ -53,21 +60,21 @@ message GetTaskResult { uint64 generation = 4; bool streaming = 5; - repeated bytes dq_graph = 6; + repeated bytes dq_graph = 6; // deprecated: use dq_graph_compressed // text, connection and binding are empty if dq_graph is not empty string text = 7; - repeated FederatedQuery.Connection connection = 8; - repeated FederatedQuery.Binding binding = 9; + repeated YandexQuery.Connection connection = 8; + repeated YandexQuery.Binding binding = 9; string user_token = 10; // IAM token for debug repeated SignedIdentity service_accounts = 11; string user_id = 12; - FederatedQuery.QueryContent.QueryType query_type = 13; + YandexQuery.QueryContent.QueryType query_type = 13; string scope = 14; - FederatedQuery.ExecuteMode execute_mode = 15; - FederatedQuery.StateLoadMode state_load_mode = 16; - FederatedQuery.QueryMeta.ComputeStatus status = 17; - repeated FederatedQuery.ResultSetMeta result_set_meta = 18; + YandexQuery.ExecuteMode execute_mode = 15; + YandexQuery.StateLoadMode state_load_mode = 16; + YandexQuery.QueryMeta.ComputeStatus status = 17; + repeated YandexQuery.ResultSetMeta result_set_meta = 18; repeated TopicConsumer created_topic_consumers = 19; int32 dq_graph_index = 20; map<string, string> sensor_labels = 21; @@ -75,8 +82,14 @@ message GetTaskResult { bool automatic = 22; string query_name = 23; google.protobuf.Timestamp deadline = 24; - FederatedQuery.StreamingDisposition disposition = 25; + YandexQuery.StreamingDisposition disposition = 25; uint64 result_limit = 26; + + YandexQuery.Limits limits = 27; + string rate_limiter = 28; // Kesus path. If empty, rate limiting is off. + google.protobuf.Duration execution_limit = 29; + google.protobuf.Timestamp request_started_at = 30; + repeated CompressedData dq_graph_compressed = 31; } repeated Task tasks = 1; } @@ -85,37 +98,47 @@ message GetTaskResponse { Ydb.Operations.Operation operation = 1; // GetTaskResult } +message RateLimiterResources { + string rate_limiter = 1; // Kesus path + string resource_path = 2; + bool delete_resource = 3; // Delete resource when query is finished. Otherwise create +} + message PingTaskRequest { string owner_id = 1; SignedIdentity query_id = 2; SignedIdentity job_id = 3; SignedIdentity result_id = 4; - FederatedQuery.QueryMeta.ComputeStatus status = 5; - NYql.NDqProto.StatusIds.StatusCode status_code = 21; + YandexQuery.QueryMeta.ComputeStatus status = 5; repeated Ydb.Issue.IssueMessage issues = 6; - repeated Ydb.Issue.IssueMessage transient_issues = 16; uint32 result_set_count = 7; string statistics = 8; - repeated FederatedQuery.ResultSetMeta result_set_meta = 9; + repeated YandexQuery.ResultSetMeta result_set_meta = 9; string executer_info = 10; - repeated bytes dq_graph = 11; - int32 dq_graph_index = 20; - string ast = 12; - string plan = 13; + repeated bytes dq_graph = 11; // deprecated: use dq_graph_compressed + string ast = 12; // deprecated: use ast_compressed + string plan = 13; // deprecated: use plan_compressed bool resign_query = 14; - repeated TopicConsumer created_topic_consumers = 17; - FederatedQuery.StateLoadMode state_load_mode = 18; - FederatedQuery.StreamingDisposition disposition = 19; Ydb.Operations.OperationParams operation_params = 15; + repeated Ydb.Issue.IssueMessage transient_issues = 16; + repeated TopicConsumer created_topic_consumers = 17; + YandexQuery.StateLoadMode state_load_mode = 18; + YandexQuery.StreamingDisposition disposition = 19; + int32 dq_graph_index = 20; + NYql.NDqProto.StatusIds.StatusCode status_code = 21; + RateLimiterResources rate_limiter_resources = 22; + repeated CompressedData dq_graph_compressed = 23; + CompressedData ast_compressed = 24; + CompressedData plan_compressed = 25; string scope = 100; - string tenant = 104; google.protobuf.Timestamp started_at = 101; google.protobuf.Timestamp finished_at = 102; google.protobuf.Timestamp deadline = 103; + string tenant = 104; } message PingTaskResult { - FederatedQuery.QueryAction action = 1; + YandexQuery.QueryAction action = 1; } message PingTaskResponse { diff --git a/ydb/core/yq/libs/protos/yq_private.proto b/ydb/core/yq/libs/protos/yq_private.proto deleted file mode 100644 index fdcfd1944bb..00000000000 --- a/ydb/core/yq/libs/protos/yq_private.proto +++ /dev/null @@ -1,187 +0,0 @@ -syntax = "proto3"; -option cc_enable_arenas = true; - -package Yq.Private; - -import "ydb/library/yql/dq/actors/protos/dq_status_codes.proto"; - -import "ydb/public/api/protos/ydb_operation.proto"; -import "ydb/public/api/protos/ydb_value.proto"; -import "ydb/public/api/protos/ydb_issue_message.proto"; -import "ydb/public/api/protos/annotations/validation.proto"; - -import "ydb/public/api/protos/yq.proto"; - -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; - -//////////////////////////////////////////////////////////// - -message CompressedData { - string method = 1; - bytes data = 2; -} - -message GetTaskRequest { - string tenant = 1; - string owner_id = 2; // guid, should be refreshed on node restart - string host = 3; - Ydb.Operations.OperationParams operation_params = 4; - - message Filter { - string query_executor = 1 [(Ydb.length).le = 100]; - } - Filter filter = 5; -} - -message SignedIdentity { - string value = 1; - string signature = 2; -} - -message TopicConsumer { - string database_id = 1; - string database = 2; - string topic_path = 3; - string consumer_name = 4; - string cluster_endpoint = 5; - bool use_ssl = 6; - string token_name = 7; - bool add_bearer_to_token = 8; -} - -message GetTaskResult { - message Task { - SignedIdentity result_id = 1; - SignedIdentity query_id = 2; - SignedIdentity job_id = 3; - uint64 generation = 4; - - bool streaming = 5; - repeated bytes dq_graph = 6; // deprecated: use dq_graph_compressed - // text, connection and binding are empty if dq_graph is not empty - string text = 7; - repeated YandexQuery.Connection connection = 8; - repeated YandexQuery.Binding binding = 9; - - string user_token = 10; // IAM token for debug - repeated SignedIdentity service_accounts = 11; - string user_id = 12; - YandexQuery.QueryContent.QueryType query_type = 13; - string scope = 14; - YandexQuery.ExecuteMode execute_mode = 15; - YandexQuery.StateLoadMode state_load_mode = 16; - YandexQuery.QueryMeta.ComputeStatus status = 17; - repeated YandexQuery.ResultSetMeta result_set_meta = 18; - repeated TopicConsumer created_topic_consumers = 19; - int32 dq_graph_index = 20; - map<string, string> sensor_labels = 21; - bool automatic = 22; - string query_name = 23; - google.protobuf.Timestamp deadline = 24; - YandexQuery.StreamingDisposition disposition = 25; - uint64 result_limit = 26; - YandexQuery.Limits limits = 27; - string rate_limiter = 28; // Kesus path. If empty, rate limiting is off. - google.protobuf.Duration execution_limit = 29; - google.protobuf.Timestamp request_started_at = 30; - repeated CompressedData dq_graph_compressed = 31; - } - repeated Task tasks = 1; -} - -message GetTaskResponse { - Ydb.Operations.Operation operation = 1; // GetTaskResult -} - -message RateLimiterResources { - string rate_limiter = 1; // Kesus path - string resource_path = 2; - bool delete_resource = 3; // Delete resource when query is finished. Otherwise create -} - -message PingTaskRequest { - string owner_id = 1; - SignedIdentity query_id = 2; - SignedIdentity job_id = 3; - SignedIdentity result_id = 4; - YandexQuery.QueryMeta.ComputeStatus status = 5; - repeated Ydb.Issue.IssueMessage issues = 6; - uint32 result_set_count = 7; - string statistics = 8; - repeated YandexQuery.ResultSetMeta result_set_meta = 9; - string executer_info = 10; - repeated bytes dq_graph = 11; // deprecated: use dq_graph_compressed - string ast = 12; // deprecated: use ast_compressed - string plan = 13; // deprecated: use plan_compressed - bool resign_query = 14; - Ydb.Operations.OperationParams operation_params = 15; - repeated Ydb.Issue.IssueMessage transient_issues = 16; - repeated TopicConsumer created_topic_consumers = 17; - YandexQuery.StateLoadMode state_load_mode = 18; - YandexQuery.StreamingDisposition disposition = 19; - int32 dq_graph_index = 20; - NYql.NDqProto.StatusIds.StatusCode status_code = 21; - RateLimiterResources rate_limiter_resources = 22; - repeated CompressedData dq_graph_compressed = 23; - CompressedData ast_compressed = 24; - CompressedData plan_compressed = 25; - string scope = 100; - google.protobuf.Timestamp started_at = 101; - google.protobuf.Timestamp finished_at = 102; - google.protobuf.Timestamp deadline = 103; - string tenant = 104; -} - -message PingTaskResult { - YandexQuery.QueryAction action = 1; -} - -message PingTaskResponse { - Ydb.Operations.Operation operation = 1; // PingTaskResult -} - -message WriteTaskResultRequest { - string owner_id = 1; - SignedIdentity result_id = 2; - Ydb.ResultSet result_set = 3; - uint32 result_set_id = 4; - uint64 offset = 5; - uint64 request_id = 6; - Ydb.Operations.OperationParams operation_params = 7; - google.protobuf.Timestamp deadline = 8; -} - -message WriteTaskResultResult { - uint64 request_id = 1; -} - -message WriteTaskResultResponse { - Ydb.Operations.Operation operation = 1; // WriteRowsResultResult -} - -message NodeInfo { - uint32 node_id = 1; - string instance_id = 2; - string hostname = 3; - uint64 active_workers = 4; - uint64 memory_limit = 5; - uint64 memory_allocated = 6; - uint32 interconnect_port = 7; - string node_address = 8; - string data_center = 9; -} - -message NodesHealthCheckRequest { - string tenant = 1; - NodeInfo node = 2; - Ydb.Operations.OperationParams operation_params = 6; -} - -message NodesHealthCheckResult { - repeated NodeInfo nodes = 1; -} - -message NodesHealthCheckResponse { - Ydb.Operations.Operation operation = 1; // NodesHealthCheckResult -} diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp index 135527411ef..32927270d3f 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp +++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp @@ -66,7 +66,7 @@ public: NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - Yq::Private::TopicConsumer topic, + Fq::Private::TopicConsumer topic, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider, ui64 index, size_t maxRetries @@ -168,7 +168,7 @@ private: private: const NActors::TActorId Owner; const TString QueryId; - const Yq::Private::TopicConsumer Topic; + const Fq::Private::TopicConsumer Topic; NYdb::TDriver YdbDriver; NYdb::NPersQueue::TPersQueueClient PqClient; ui64 Index = 0; @@ -183,7 +183,7 @@ public: NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Yq::Private::TopicConsumer> topics, + TVector<Fq::Private::TopicConsumer> topics, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, size_t maxRetries ) @@ -256,7 +256,7 @@ private: const NActors::TActorId Owner; const TString QueryId; NYdb::TDriver YdbDriver; - const TVector<Yq::Private::TopicConsumer> Topics; + const TVector<Fq::Private::TopicConsumer> Topics; const TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> Credentials; const size_t MaxRetries; size_t ResultsGot = 0; @@ -271,7 +271,7 @@ NActors::IActor* MakeReadRuleDeleterActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Yq::Private::TopicConsumer> topics, + TVector<Fq::Private::TopicConsumer> topics, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic size_t maxRetries ) diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.h b/ydb/core/yq/libs/read_rule/read_rule_deleter.h index c704109ac41..69a2f256802 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_deleter.h +++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.h @@ -1,5 +1,5 @@ #pragma once -#include <ydb/core/yq/libs/protos/yq_private.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> @@ -11,7 +11,7 @@ NActors::IActor* MakeReadRuleDeleterActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Yq::Private::TopicConsumer> topics, + TVector<Fq::Private::TopicConsumer> topics, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic size_t maxRetries = 15 ); diff --git a/ydb/public/api/grpc/draft/CMakeLists.txt b/ydb/public/api/grpc/draft/CMakeLists.txt index f048ca35d81..2093014f616 100644 --- a/ydb/public/api/grpc/draft/CMakeLists.txt +++ b/ydb/public/api/grpc/draft/CMakeLists.txt @@ -29,7 +29,6 @@ target_proto_messages(api-grpc-draft PRIVATE ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_s3_internal_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_long_tx_v1.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/ydb_logstore_v1.proto - ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1_fq.proto ${CMAKE_SOURCE_DIR}/ydb/public/api/grpc/draft/yql_db_v1.proto ) target_proto_addincls(api-grpc-draft diff --git a/ydb/public/api/grpc/draft/yql_db_v1.proto b/ydb/public/api/grpc/draft/yql_db_v1.proto index 3173fe53ad0..cc403e93a43 100644 --- a/ydb/public/api/grpc/draft/yql_db_v1.proto +++ b/ydb/public/api/grpc/draft/yql_db_v1.proto @@ -1,19 +1,19 @@ syntax = "proto3"; -package Yq.Private.V1; +package Fq.Private.V1; -import "ydb/core/yq/libs/protos/yq_private.proto"; +import "ydb/core/yq/libs/protos/fq_private.proto"; -service YqPrivateTaskService { +service FqPrivateTaskService { // gets new task - rpc GetTask(Yq.Private.GetTaskRequest) returns (Yq.Private.GetTaskResponse); + rpc GetTask(Fq.Private.GetTaskRequest) returns (Fq.Private.GetTaskResponse); // pings new task (also can update metadata) - rpc PingTask(Yq.Private.PingTaskRequest) returns (Yq.Private.PingTaskResponse); + rpc PingTask(Fq.Private.PingTaskRequest) returns (Fq.Private.PingTaskResponse); // writes rows - rpc WriteTaskResult(Yq.Private.WriteTaskResultRequest) returns (Yq.Private.WriteTaskResultResponse); + rpc WriteTaskResult(Fq.Private.WriteTaskResultRequest) returns (Fq.Private.WriteTaskResultResponse); //Nodes - rpc NodesHealthCheck(Yq.Private.NodesHealthCheckRequest) returns (Yq.Private.NodesHealthCheckResponse); + rpc NodesHealthCheck(Fq.Private.NodesHealthCheckRequest) returns (Fq.Private.NodesHealthCheckResponse); } diff --git a/ydb/public/api/grpc/draft/yql_db_v1_fq.proto b/ydb/public/api/grpc/draft/yql_db_v1_fq.proto deleted file mode 100644 index cc403e93a43..00000000000 --- a/ydb/public/api/grpc/draft/yql_db_v1_fq.proto +++ /dev/null @@ -1,19 +0,0 @@ -syntax = "proto3"; - -package Fq.Private.V1; - -import "ydb/core/yq/libs/protos/fq_private.proto"; - -service FqPrivateTaskService { - // gets new task - rpc GetTask(Fq.Private.GetTaskRequest) returns (Fq.Private.GetTaskResponse); - - // pings new task (also can update metadata) - rpc PingTask(Fq.Private.PingTaskRequest) returns (Fq.Private.PingTaskResponse); - - // writes rows - rpc WriteTaskResult(Fq.Private.WriteTaskResultRequest) returns (Fq.Private.WriteTaskResultResponse); - - //Nodes - rpc NodesHealthCheck(Fq.Private.NodesHealthCheckRequest) returns (Fq.Private.NodesHealthCheckResponse); -} diff --git a/ydb/services/fq/private_grpc.h b/ydb/services/fq/private_grpc.h index ddd5252a0e2..758a298d5c8 100644 --- a/ydb/services/fq/private_grpc.h +++ b/ydb/services/fq/private_grpc.h @@ -2,7 +2,7 @@ #include <library/cpp/actors/core/actorsystem.h> #include <library/cpp/grpc/server/grpc_server.h> -#include <ydb/public/api/grpc/draft/yql_db_v1_fq.grpc.pb.h> +#include <ydb/public/api/grpc/draft/yql_db_v1.grpc.pb.h> namespace NKikimr { namespace NGRpcService { diff --git a/ydb/services/fq/ut_integration/fq_ut.cpp b/ydb/services/fq/ut_integration/fq_ut.cpp index b7024005d90..e58fcc9e937 100644 --- a/ydb/services/fq/ut_integration/fq_ut.cpp +++ b/ydb/services/fq/ut_integration/fq_ut.cpp @@ -5,7 +5,7 @@ #include <ydb/public/lib/fq/helpers.h> #include <ydb/core/yq/libs/db_schema/db_schema.h> #include <ydb/core/yq/libs/mock/yql_mock.h> -#include <ydb/core/yq/libs/private_client/private_client_fq.h> +#include <ydb/core/yq/libs/private_client/private_client.h> #include <ydb/core/yq/libs/control_plane_storage/message_builders.h> #include <ydb/core/yq/libs/actors/database_resolver.h> @@ -941,7 +941,7 @@ Y_UNIT_TEST_SUITE(PrivateApi) { req.mutable_query_id()->set_value("id"); req.set_scope(scope.ToString()); req.set_owner_id("some_owner"); - req.set_status(FederatedQuery::QueryMeta::COMPLETED); + req.set_status(YandexQuery::QueryMeta::COMPLETED); auto result = client.PingTask(std::move(req)).ExtractValueSync(); result.GetIssues().PrintTo(Cerr); UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::GENERIC_ERROR); diff --git a/ydb/services/yq/CMakeLists.txt b/ydb/services/yq/CMakeLists.txt index aa8ee401411..70ae7cc47c3 100644 --- a/ydb/services/yq/CMakeLists.txt +++ b/ydb/services/yq/CMakeLists.txt @@ -20,5 +20,4 @@ target_link_libraries(ydb-services-yq PUBLIC ) target_sources(ydb-services-yq PRIVATE ${CMAKE_SOURCE_DIR}/ydb/services/yq/grpc_service.cpp - ${CMAKE_SOURCE_DIR}/ydb/services/yq/private_grpc.cpp ) diff --git a/ydb/services/yq/private_grpc.cpp b/ydb/services/yq/private_grpc.cpp deleted file mode 100644 index da64eddbbb4..00000000000 --- a/ydb/services/yq/private_grpc.cpp +++ /dev/null @@ -1,66 +0,0 @@ -#include "private_grpc.h" - -#include <ydb/core/grpc_services/base/base.h> -#include <ydb/core/grpc_services/grpc_helper.h> -#include <ydb/core/grpc_services/service_analytics_internal.h> -#include <ydb/library/protobuf_printer/security_printer.h> - -namespace NKikimr { -namespace NGRpcService { - -TGRpcYqPrivateTaskService::TGRpcYqPrivateTaskService(NActors::TActorSystem *system, - TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, NActors::TActorId id) - : ActorSystem_(system) - , Counters_(counters) - , GRpcRequestProxyId_(id) {} - -void TGRpcYqPrivateTaskService::InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) { - CQ_ = cq; - SetupIncomingRequests(std::move(logger)); -} - -void TGRpcYqPrivateTaskService::SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) { - Limiter_ = limiter; -} - -bool TGRpcYqPrivateTaskService::IncRequest() { - return Limiter_->Inc(); -} - -void TGRpcYqPrivateTaskService::DecRequest() { - Limiter_->Dec(); - Y_ASSERT(Limiter_->GetCurrentInFlight() >= 0); -} - -void TGRpcYqPrivateTaskService::SetupIncomingRequests(NGrpc::TLoggerPtr logger) { - auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); - -#ifdef ADD_REQUEST -#error ADD_REQUEST macro already defined -#endif -#define ADD_REQUEST(NAME, CB) \ -MakeIntrusive<TGRpcRequest<Yq::Private::NAME##Request, Yq::Private::NAME##Response, TGRpcYqPrivateTaskService, TSecurityTextFormatPrinter<Yq::Private::NAME##Request>, TSecurityTextFormatPrinter<Yq::Private::NAME##Response>>>( \ - this, &Service_, CQ_, \ - [this](NGrpc::IRequestContextBase *ctx) { \ - NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ - ActorSystem_->Send(GRpcRequestProxyId_, \ - new TGrpcRequestOperationCall<Yq::Private::NAME##Request, Yq::Private::NAME##Response> \ - (ctx, &CB)); \ - }, \ - &Yq::Private::V1::YqPrivateTaskService::AsyncService::Request##NAME, \ - #NAME, logger, getCounterBlock("yql_internal", #NAME)) \ - ->Run(); \ - - ADD_REQUEST(PingTask, DoYqPrivatePingTaskRequest) - - ADD_REQUEST(GetTask, DoYqPrivateGetTaskRequest) - - ADD_REQUEST(WriteTaskResult, DoYqPrivateWriteTaskResultRequest) - - ADD_REQUEST(NodesHealthCheck, DoYqPrivateNodesHealthCheckRequest) - -#undef ADD_REQUEST -} - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/services/yq/private_grpc.h b/ydb/services/yq/private_grpc.h deleted file mode 100644 index 7b815506607..00000000000 --- a/ydb/services/yq/private_grpc.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include <library/cpp/actors/core/actorsystem.h> -#include <library/cpp/grpc/server/grpc_server.h> -#include <ydb/public/api/grpc/draft/yql_db_v1.grpc.pb.h> - -namespace NKikimr { -namespace NGRpcService { - -class TGRpcYqPrivateTaskService - : public NGrpc::TGrpcServiceBase<Yq::Private::V1::YqPrivateTaskService> -{ -public: - TGRpcYqPrivateTaskService(NActors::TActorSystem* system, TIntrusivePtr<::NMonitoring::TDynamicCounters> counters, - NActors::TActorId id); - - void InitService(grpc::ServerCompletionQueue* cq, NGrpc::TLoggerPtr logger) override; - void SetGlobalLimiterHandle(NGrpc::TGlobalLimiter* limiter) override; - - bool IncRequest(); - void DecRequest(); -private: - void SetupIncomingRequests(NGrpc::TLoggerPtr logger); - - NActors::TActorSystem* ActorSystem_; - grpc::ServerCompletionQueue* CQ_ = nullptr; - - TIntrusivePtr<::NMonitoring::TDynamicCounters> Counters_; - NActors::TActorId GRpcRequestProxyId_; - NGrpc::TGlobalLimiter* Limiter_ = nullptr; -}; - -} // namespace NGRpcService -} // namespace NKikimr diff --git a/ydb/services/yq/ut_integration/yq_ut.cpp b/ydb/services/yq/ut_integration/yq_ut.cpp index 5a129b8fa35..470d2bf8354 100644 --- a/ydb/services/yq/ut_integration/yq_ut.cpp +++ b/ydb/services/yq/ut_integration/yq_ut.cpp @@ -932,12 +932,12 @@ Y_UNIT_TEST_SUITE(PrivateApi) { ui16 grpc = server.GetPort(); TString location = TStringBuilder() << "localhost:" << grpc; auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - ::NYq::TPrivateClient client(driver); + ::NFq::TPrivateClient client(driver); const TString historyId = "id"; const TString folderId = "folder_id"; const TScope scope(folderId); { - Yq::Private::PingTaskRequest req; + Fq::Private::PingTaskRequest req; req.mutable_query_id()->set_value("id"); req.set_scope(scope.ToString()); req.set_owner_id("some_owner"); @@ -953,9 +953,9 @@ Y_UNIT_TEST_SUITE(PrivateApi) { ui16 grpc = server.GetPort(); TString location = TStringBuilder() << "localhost:" << grpc; auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - ::NYq::TPrivateClient client(driver); + ::NFq::TPrivateClient client(driver); { - Yq::Private::GetTaskRequest req; + Fq::Private::GetTaskRequest req; req.set_owner_id("owner_id"); req.set_host("host"); auto result = client.GetTask(std::move(req)).ExtractValueSync(); @@ -970,10 +970,10 @@ Y_UNIT_TEST_SUITE(PrivateApi) { ui16 grpc = server.GetPort(); TString location = TStringBuilder() << "localhost:" << grpc; auto driver = TDriver(TDriverConfig().SetEndpoint(location).SetAuthToken("root@builtin")); - ::NYq::TPrivateClient client(driver); + ::NFq::TPrivateClient client(driver); const auto instanceId = CreateGuidAsString(); { - Yq::Private::NodesHealthCheckRequest req; + Fq::Private::NodesHealthCheckRequest req; req.set_tenant("Tenant"); auto& node = *req.mutable_node(); node.set_hostname("hostname"); |