diff options
author | Aleksandr Khoroshilov <[email protected]> | 2022-05-19 21:01:57 +0300 |
---|---|---|
committer | Aleksandr Khoroshilov <[email protected]> | 2022-05-19 21:01:57 +0300 |
commit | 0393681f68a46b340f802ada83a7b88b2e46d648 (patch) | |
tree | 0c79ba0be8fa440a121d9de0882cca253cc2cd44 | |
parent | fca1de64c46a64296254b2910846fc3da50f0e4c (diff) |
Loopback Internal Service for collocated (yellow+red) planes
ref:60fc5fd19934c5d2aba4e7b49a22679a0f1b3ce0
-rw-r--r-- | ydb/core/yq/libs/config/protos/private_api.proto | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 13 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/events.h | 73 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/internal_service.h | 54 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/loopback_service.cpp | 105 | ||||
-rw-r--r-- | ydb/core/yq/libs/private_client/loopback_service.h | 12 |
7 files changed, 201 insertions, 58 deletions
diff --git a/ydb/core/yq/libs/config/protos/private_api.proto b/ydb/core/yq/libs/config/protos/private_api.proto index 62c47c03ca5..21c5af86ac1 100644 --- a/ydb/core/yq/libs/config/protos/private_api.proto +++ b/ydb/core/yq/libs/config/protos/private_api.proto @@ -13,4 +13,5 @@ message TPrivateApiConfig { bool SecureTaskService = 4; string SaKeyFile = 5; string IamEndpoint = 6; + bool Loopback = 7; } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 7fc0e22acc1..11cf9146a19 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -9,6 +9,7 @@ #include <ydb/core/yq/libs/health/health.h> #include <ydb/core/yq/libs/checkpoint_storage/storage_service.h> #include <ydb/core/yq/libs/private_client/internal_service.h> +#include <ydb/core/yq/libs/private_client/loopback_service.h> #include <ydb/core/yq/libs/quota_manager/quota_manager.h> #include <ydb/core/yq/libs/shared_resources/shared_resources.h> #include <ydb/library/folder_service/folder_service.h> @@ -178,11 +179,13 @@ void Init( ::NYql::NCommon::TServiceCounters serviceCounters(appData->Counters); if (protoConfig.GetNodesManager().GetEnabled() || protoConfig.GetPendingFetcher().GetEnabled()) { - auto internal = CreateInternalServiceActor( - yqSharedResources, - credentialsProviderFactory, - protoConfig.GetPrivateApi(), - clientCounters + auto internal = protoConfig.GetPrivateApi().GetLoopback() + ? CreateLoopbackServiceActor(clientCounters) + : CreateInternalServiceActor( + yqSharedResources, + credentialsProviderFactory, + protoConfig.GetPrivateApi(), + clientCounters ); actorRegistrator(MakeInternalServiceActorId(), internal); } diff --git a/ydb/core/yq/libs/private_client/CMakeLists.txt b/ydb/core/yq/libs/private_client/CMakeLists.txt index 734322131c4..0f5f23c5b4d 100644 --- a/ydb/core/yq/libs/private_client/CMakeLists.txt +++ b/ydb/core/yq/libs/private_client/CMakeLists.txt @@ -24,5 +24,6 @@ target_link_libraries(yq-libs-private_client PUBLIC ) target_sources(yq-libs-private_client PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/internal_service.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/loopback_service.cpp ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client.cpp ) diff --git a/ydb/core/yq/libs/private_client/events.h b/ydb/core/yq/libs/private_client/events.h new file mode 100644 index 00000000000..073a097435a --- /dev/null +++ b/ydb/core/yq/libs/private_client/events.h @@ -0,0 +1,73 @@ +#pragma once + +#include "private_client.h" + +#include <library/cpp/actors/core/actor.h> +#include <library/cpp/actors/core/event_local.h> + +#include <library/cpp/monlib/dynamic_counters/counters.h> +#include <ydb/core/yq/libs/events/event_subspace.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +#include <ydb/core/yq/libs/shared_resources/shared_resources.h> + +#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h> + +namespace NYq { + +struct TEvInternalService { + // Event ids. + enum EEv : ui32 { + EvHealthCheckRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::InternalService), + EvHealthCheckResponse, + EvGetTaskRequest, + EvGetTaskResponse, + EvPingTaskRequest, + EvPingTaskResponse, + EvWriteResultRequest, + EvWriteResultResponse, + EvEnd, + }; + + static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::InternalService), "All events must be in their subspace"); + + template <class TProtoRequest, ui32 TEventType> + struct TInternalServiceRequestEvent : public NActors::TEventLocal<TInternalServiceRequestEvent<TProtoRequest, TEventType>, TEventType> { + TProtoRequest Request; + TInstant SentAt; + explicit TInternalServiceRequestEvent(const TProtoRequest& request) + : Request(request), SentAt(Now()) + { } + }; + + using TEvHealthCheckRequest = TInternalServiceRequestEvent<Yq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>; + using TEvGetTaskRequest = TInternalServiceRequestEvent<Yq::Private::GetTaskRequest, EvGetTaskRequest>; + using TEvPingTaskRequest = TInternalServiceRequestEvent<Yq::Private::PingTaskRequest, EvPingTaskRequest>; + using TEvWriteResultRequest = TInternalServiceRequestEvent<Yq::Private::WriteTaskResultRequest, EvWriteResultRequest>; + + template <class TProtoResult, ui32 TEventType> + struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> { + NYdb::TStatus Status; + TProtoResult Result; + explicit TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) { + if (wrappedResult.IsResultSet()) { + Result = wrappedResult.GetResult(); + } + } + explicit TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) { + } + explicit TInternalServiceResponseEvent(const TProtoResult& result) : Status(NYdb::EStatus::SUCCESS, NYql::TIssues()), Result(result) { + } + TInternalServiceResponseEvent(NYdb::EStatus statusCode, NYql::TIssues&& issues) : Status(statusCode, std::move(issues)) { + } + }; + + using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>; + using TEvGetTaskResponse = TInternalServiceResponseEvent<Yq::Private::GetTaskResult, EvGetTaskResponse>; + using TEvPingTaskResponse = TInternalServiceResponseEvent<Yq::Private::PingTaskResult, EvPingTaskResponse>; + using TEvWriteResultResponse = TInternalServiceResponseEvent<Yq::Private::WriteTaskResultResult, EvWriteResultResponse>; +}; + +NActors::TActorId MakeInternalServiceActorId(); + +} /* NYq */ diff --git a/ydb/core/yq/libs/private_client/internal_service.h b/ydb/core/yq/libs/private_client/internal_service.h index 5bad866dfe0..fc65c22abe3 100644 --- a/ydb/core/yq/libs/private_client/internal_service.h +++ b/ydb/core/yq/libs/private_client/internal_service.h @@ -1,5 +1,6 @@ #pragma once +#include "events.h" #include "private_client.h" #include <library/cpp/actors/core/actor.h> @@ -15,59 +16,6 @@ namespace NYq { -struct TEvInternalService { - // Event ids. - enum EEv : ui32 { - EvHealthCheckRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::InternalService), - EvHealthCheckResponse, - EvGetTaskRequest, - EvGetTaskResponse, - EvPingTaskRequest, - EvPingTaskResponse, - EvWriteResultRequest, - EvWriteResultResponse, - EvEnd, - }; - - static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::InternalService), "All events must be in their subspace"); - - template <class TProtoRequest, ui32 TEventType> - struct TInternalServiceRequestEvent : public NActors::TEventLocal<TInternalServiceRequestEvent<TProtoRequest, TEventType>, TEventType> { - TProtoRequest Request; - TInstant SentAt; - explicit TInternalServiceRequestEvent(const TProtoRequest& request) - : Request(request), SentAt(Now()) - { } - }; - - using TEvHealthCheckRequest = TInternalServiceRequestEvent<Yq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>; - using TEvGetTaskRequest = TInternalServiceRequestEvent<Yq::Private::GetTaskRequest, EvGetTaskRequest>; - using TEvPingTaskRequest = TInternalServiceRequestEvent<Yq::Private::PingTaskRequest, EvPingTaskRequest>; - using TEvWriteResultRequest = TInternalServiceRequestEvent<Yq::Private::WriteTaskResultRequest, EvWriteResultRequest>; - - template <class TProtoResult, ui32 TEventType> - struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> { - NYdb::TStatus Status; - TProtoResult Result; - TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) - { - if (wrappedResult.IsResultSet()) { - Result = wrappedResult.GetResult(); - } - } - TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) - { - } - }; - - using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>; - using TEvGetTaskResponse = TInternalServiceResponseEvent<Yq::Private::GetTaskResult, EvGetTaskResponse>; - using TEvPingTaskResponse = TInternalServiceResponseEvent<Yq::Private::PingTaskResult, EvPingTaskResponse>; - using TEvWriteResultResponse = TInternalServiceResponseEvent<Yq::Private::WriteTaskResultResult, EvWriteResultResponse>; -}; - -NActors::TActorId MakeInternalServiceActorId(); - NActors::IActor* CreateInternalServiceActor( const NYq::TYqSharedResources::TPtr& yqSharedResources, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp new file mode 100644 index 00000000000..0187ff44ded --- /dev/null +++ b/ydb/core/yq/libs/private_client/loopback_service.cpp @@ -0,0 +1,105 @@ +#include "loopback_service.h" + +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/log.h> + +#include <ydb/core/protos/services.pb.h> + +#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h> +#include <ydb/core/yq/libs/control_plane_storage/events/events.h> + +#define LOG_E(stream) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) +#define LOG_W(stream) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) +#define LOG_I(stream) \ + LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) +#define LOG_D(stream) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream) + +namespace NYq { + +class TLoopbackService : public NActors::TActorBootstrapped<TLoopbackService> { +public: + TLoopbackService( + const NMonitoring::TDynamicCounterPtr& counters) + : ServiceCounters(counters->GetSubgroup("subsystem", "LoopbackService")) + { + } + + static constexpr char ActorName[] = "FQ_LOOPBACK_SERVICE"; + + void Bootstrap() { + Become(&TLoopbackService::StateFunc); + LOG_I("STARTED"); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvInternalService::TEvHealthCheckRequest, Handle) + hFunc(TEvInternalService::TEvGetTaskRequest, Handle) + hFunc(TEvInternalService::TEvPingTaskRequest, Handle) + hFunc(TEvInternalService::TEvWriteResultRequest, Handle) + + hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle) + ); + + void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) { + Cookie++; + Senders[Cookie] = ev->Sender; + auto request = ev->Get()->Request; + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest(std::move(request)), 0, Cookie); + } + + void Handle(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev) { + auto it = Senders.find(ev->Cookie); + if (it != Senders.end()) { + if (ev->Get()->Issues.Size() == 0) { + Send(it->second, new TEvInternalService::TEvHealthCheckResponse(ev->Get()->Record)); + } else { + auto issues = ev->Get()->Issues; + Send(it->second, new TEvInternalService::TEvHealthCheckResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues))); + } + Senders.erase(it); + } + } + + void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& /*ev*/) { + /* + Cookie++; + Senders[Cookie] = ev->Sender; + auto request = ev->Get()->Request; + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie); + */ + } + + void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) { + /* + Cookie++; + Senders[Cookie] = ev->Sender; + auto request = ev->Get()->Request; + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie); + */ + } + + void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& /*ev*/) { + /* + Cookie++; + Senders[Cookie] = ev->Sender; + auto request = ev->Get()->Request; + Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultRequest(std::move(request)), 0, Cookie); + */ + } + + const NMonitoring::TDynamicCounterPtr ServiceCounters; + ui32 Cookie = 0; + THashMap<ui32, NActors::TActorId> Senders; +}; + +NActors::IActor* CreateLoopbackServiceActor( + const NMonitoring::TDynamicCounterPtr& counters) { + return new TLoopbackService(counters); +} + +} /* NYq */ diff --git a/ydb/core/yq/libs/private_client/loopback_service.h b/ydb/core/yq/libs/private_client/loopback_service.h new file mode 100644 index 00000000000..7bf15de7991 --- /dev/null +++ b/ydb/core/yq/libs/private_client/loopback_service.h @@ -0,0 +1,12 @@ +#pragma once + +#include "events.h" + +#include <library/cpp/monlib/dynamic_counters/counters.h> + +namespace NYq { + +NActors::IActor* CreateLoopbackServiceActor( + const NMonitoring::TDynamicCounterPtr& counters); + +} /* NYq */ |