summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAleksandr Khoroshilov <[email protected]>2022-05-19 21:01:57 +0300
committerAleksandr Khoroshilov <[email protected]>2022-05-19 21:01:57 +0300
commit0393681f68a46b340f802ada83a7b88b2e46d648 (patch)
tree0c79ba0be8fa440a121d9de0882cca253cc2cd44
parentfca1de64c46a64296254b2910846fc3da50f0e4c (diff)
Loopback Internal Service for collocated (yellow+red) planes
ref:60fc5fd19934c5d2aba4e7b49a22679a0f1b3ce0
-rw-r--r--ydb/core/yq/libs/config/protos/private_api.proto1
-rw-r--r--ydb/core/yq/libs/init/init.cpp13
-rw-r--r--ydb/core/yq/libs/private_client/CMakeLists.txt1
-rw-r--r--ydb/core/yq/libs/private_client/events.h73
-rw-r--r--ydb/core/yq/libs/private_client/internal_service.h54
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.cpp105
-rw-r--r--ydb/core/yq/libs/private_client/loopback_service.h12
7 files changed, 201 insertions, 58 deletions
diff --git a/ydb/core/yq/libs/config/protos/private_api.proto b/ydb/core/yq/libs/config/protos/private_api.proto
index 62c47c03ca5..21c5af86ac1 100644
--- a/ydb/core/yq/libs/config/protos/private_api.proto
+++ b/ydb/core/yq/libs/config/protos/private_api.proto
@@ -13,4 +13,5 @@ message TPrivateApiConfig {
bool SecureTaskService = 4;
string SaKeyFile = 5;
string IamEndpoint = 6;
+ bool Loopback = 7;
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 7fc0e22acc1..11cf9146a19 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -9,6 +9,7 @@
#include <ydb/core/yq/libs/health/health.h>
#include <ydb/core/yq/libs/checkpoint_storage/storage_service.h>
#include <ydb/core/yq/libs/private_client/internal_service.h>
+#include <ydb/core/yq/libs/private_client/loopback_service.h>
#include <ydb/core/yq/libs/quota_manager/quota_manager.h>
#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
#include <ydb/library/folder_service/folder_service.h>
@@ -178,11 +179,13 @@ void Init(
::NYql::NCommon::TServiceCounters serviceCounters(appData->Counters);
if (protoConfig.GetNodesManager().GetEnabled() || protoConfig.GetPendingFetcher().GetEnabled()) {
- auto internal = CreateInternalServiceActor(
- yqSharedResources,
- credentialsProviderFactory,
- protoConfig.GetPrivateApi(),
- clientCounters
+ auto internal = protoConfig.GetPrivateApi().GetLoopback()
+ ? CreateLoopbackServiceActor(clientCounters)
+ : CreateInternalServiceActor(
+ yqSharedResources,
+ credentialsProviderFactory,
+ protoConfig.GetPrivateApi(),
+ clientCounters
);
actorRegistrator(MakeInternalServiceActorId(), internal);
}
diff --git a/ydb/core/yq/libs/private_client/CMakeLists.txt b/ydb/core/yq/libs/private_client/CMakeLists.txt
index 734322131c4..0f5f23c5b4d 100644
--- a/ydb/core/yq/libs/private_client/CMakeLists.txt
+++ b/ydb/core/yq/libs/private_client/CMakeLists.txt
@@ -24,5 +24,6 @@ target_link_libraries(yq-libs-private_client PUBLIC
)
target_sources(yq-libs-private_client PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/internal_service.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/loopback_service.cpp
${CMAKE_SOURCE_DIR}/ydb/core/yq/libs/private_client/private_client.cpp
)
diff --git a/ydb/core/yq/libs/private_client/events.h b/ydb/core/yq/libs/private_client/events.h
new file mode 100644
index 00000000000..073a097435a
--- /dev/null
+++ b/ydb/core/yq/libs/private_client/events.h
@@ -0,0 +1,73 @@
+#pragma once
+
+#include "private_client.h"
+
+#include <library/cpp/actors/core/actor.h>
+#include <library/cpp/actors/core/event_local.h>
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+#include <ydb/core/yq/libs/events/event_subspace.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+#include <ydb/core/yq/libs/shared_resources/shared_resources.h>
+
+#include <ydb/core/yq/libs/control_plane_storage/proto/yq_internal.pb.h>
+
+namespace NYq {
+
+struct TEvInternalService {
+ // Event ids.
+ enum EEv : ui32 {
+ EvHealthCheckRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::InternalService),
+ EvHealthCheckResponse,
+ EvGetTaskRequest,
+ EvGetTaskResponse,
+ EvPingTaskRequest,
+ EvPingTaskResponse,
+ EvWriteResultRequest,
+ EvWriteResultResponse,
+ EvEnd,
+ };
+
+ static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::InternalService), "All events must be in their subspace");
+
+ template <class TProtoRequest, ui32 TEventType>
+ struct TInternalServiceRequestEvent : public NActors::TEventLocal<TInternalServiceRequestEvent<TProtoRequest, TEventType>, TEventType> {
+ TProtoRequest Request;
+ TInstant SentAt;
+ explicit TInternalServiceRequestEvent(const TProtoRequest& request)
+ : Request(request), SentAt(Now())
+ { }
+ };
+
+ using TEvHealthCheckRequest = TInternalServiceRequestEvent<Yq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>;
+ using TEvGetTaskRequest = TInternalServiceRequestEvent<Yq::Private::GetTaskRequest, EvGetTaskRequest>;
+ using TEvPingTaskRequest = TInternalServiceRequestEvent<Yq::Private::PingTaskRequest, EvPingTaskRequest>;
+ using TEvWriteResultRequest = TInternalServiceRequestEvent<Yq::Private::WriteTaskResultRequest, EvWriteResultRequest>;
+
+ template <class TProtoResult, ui32 TEventType>
+ struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> {
+ NYdb::TStatus Status;
+ TProtoResult Result;
+ explicit TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult) {
+ if (wrappedResult.IsResultSet()) {
+ Result = wrappedResult.GetResult();
+ }
+ }
+ explicit TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)}) {
+ }
+ explicit TInternalServiceResponseEvent(const TProtoResult& result) : Status(NYdb::EStatus::SUCCESS, NYql::TIssues()), Result(result) {
+ }
+ TInternalServiceResponseEvent(NYdb::EStatus statusCode, NYql::TIssues&& issues) : Status(statusCode, std::move(issues)) {
+ }
+ };
+
+ using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>;
+ using TEvGetTaskResponse = TInternalServiceResponseEvent<Yq::Private::GetTaskResult, EvGetTaskResponse>;
+ using TEvPingTaskResponse = TInternalServiceResponseEvent<Yq::Private::PingTaskResult, EvPingTaskResponse>;
+ using TEvWriteResultResponse = TInternalServiceResponseEvent<Yq::Private::WriteTaskResultResult, EvWriteResultResponse>;
+};
+
+NActors::TActorId MakeInternalServiceActorId();
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/private_client/internal_service.h b/ydb/core/yq/libs/private_client/internal_service.h
index 5bad866dfe0..fc65c22abe3 100644
--- a/ydb/core/yq/libs/private_client/internal_service.h
+++ b/ydb/core/yq/libs/private_client/internal_service.h
@@ -1,5 +1,6 @@
#pragma once
+#include "events.h"
#include "private_client.h"
#include <library/cpp/actors/core/actor.h>
@@ -15,59 +16,6 @@
namespace NYq {
-struct TEvInternalService {
- // Event ids.
- enum EEv : ui32 {
- EvHealthCheckRequest = YqEventSubspaceBegin(NYq::TYqEventSubspace::InternalService),
- EvHealthCheckResponse,
- EvGetTaskRequest,
- EvGetTaskResponse,
- EvPingTaskRequest,
- EvPingTaskResponse,
- EvWriteResultRequest,
- EvWriteResultResponse,
- EvEnd,
- };
-
- static_assert(EvEnd <= YqEventSubspaceEnd(NYq::TYqEventSubspace::InternalService), "All events must be in their subspace");
-
- template <class TProtoRequest, ui32 TEventType>
- struct TInternalServiceRequestEvent : public NActors::TEventLocal<TInternalServiceRequestEvent<TProtoRequest, TEventType>, TEventType> {
- TProtoRequest Request;
- TInstant SentAt;
- explicit TInternalServiceRequestEvent(const TProtoRequest& request)
- : Request(request), SentAt(Now())
- { }
- };
-
- using TEvHealthCheckRequest = TInternalServiceRequestEvent<Yq::Private::NodesHealthCheckRequest, EvHealthCheckRequest>;
- using TEvGetTaskRequest = TInternalServiceRequestEvent<Yq::Private::GetTaskRequest, EvGetTaskRequest>;
- using TEvPingTaskRequest = TInternalServiceRequestEvent<Yq::Private::PingTaskRequest, EvPingTaskRequest>;
- using TEvWriteResultRequest = TInternalServiceRequestEvent<Yq::Private::WriteTaskResultRequest, EvWriteResultRequest>;
-
- template <class TProtoResult, ui32 TEventType>
- struct TInternalServiceResponseEvent : public NActors::TEventLocal<TInternalServiceResponseEvent<TProtoResult, TEventType>, TEventType> {
- NYdb::TStatus Status;
- TProtoResult Result;
- TInternalServiceResponseEvent(const TProtoResultInternalWrapper<TProtoResult>& wrappedResult) : Status(wrappedResult)
- {
- if (wrappedResult.IsResultSet()) {
- Result = wrappedResult.GetResult();
- }
- }
- TInternalServiceResponseEvent(const TString& errorMessage) : Status(NYdb::EStatus::INTERNAL_ERROR, {NYql::TIssue(errorMessage).SetCode(NYql::UNEXPECTED_ERROR, NYql::TSeverityIds::S_ERROR)})
- {
- }
- };
-
- using TEvHealthCheckResponse = TInternalServiceResponseEvent<Yq::Private::NodesHealthCheckResult, EvHealthCheckResponse>;
- using TEvGetTaskResponse = TInternalServiceResponseEvent<Yq::Private::GetTaskResult, EvGetTaskResponse>;
- using TEvPingTaskResponse = TInternalServiceResponseEvent<Yq::Private::PingTaskResult, EvPingTaskResponse>;
- using TEvWriteResultResponse = TInternalServiceResponseEvent<Yq::Private::WriteTaskResultResult, EvWriteResultResponse>;
-};
-
-NActors::TActorId MakeInternalServiceActorId();
-
NActors::IActor* CreateInternalServiceActor(
const NYq::TYqSharedResources::TPtr& yqSharedResources,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
diff --git a/ydb/core/yq/libs/private_client/loopback_service.cpp b/ydb/core/yq/libs/private_client/loopback_service.cpp
new file mode 100644
index 00000000000..0187ff44ded
--- /dev/null
+++ b/ydb/core/yq/libs/private_client/loopback_service.cpp
@@ -0,0 +1,105 @@
+#include "loopback_service.h"
+
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/log.h>
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/core/yq/libs/control_plane_storage/control_plane_storage.h>
+#include <ydb/core/yq/libs/control_plane_storage/events/events.h>
+
+#define LOG_E(stream) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream)
+#define LOG_W(stream) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream)
+#define LOG_I(stream) \
+ LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream)
+#define LOG_D(stream) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::FQ_INTERNAL_SERVICE, stream)
+
+namespace NYq {
+
+class TLoopbackService : public NActors::TActorBootstrapped<TLoopbackService> {
+public:
+ TLoopbackService(
+ const NMonitoring::TDynamicCounterPtr& counters)
+ : ServiceCounters(counters->GetSubgroup("subsystem", "LoopbackService"))
+ {
+ }
+
+ static constexpr char ActorName[] = "FQ_LOOPBACK_SERVICE";
+
+ void Bootstrap() {
+ Become(&TLoopbackService::StateFunc);
+ LOG_I("STARTED");
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvInternalService::TEvHealthCheckRequest, Handle)
+ hFunc(TEvInternalService::TEvGetTaskRequest, Handle)
+ hFunc(TEvInternalService::TEvPingTaskRequest, Handle)
+ hFunc(TEvInternalService::TEvWriteResultRequest, Handle)
+
+ hFunc(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse, Handle)
+ );
+
+ void Handle(TEvInternalService::TEvHealthCheckRequest::TPtr& ev) {
+ Cookie++;
+ Senders[Cookie] = ev->Sender;
+ auto request = ev->Get()->Request;
+ Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvNodesHealthCheckRequest(std::move(request)), 0, Cookie);
+ }
+
+ void Handle(NYq::TEvControlPlaneStorage::TEvNodesHealthCheckResponse::TPtr& ev) {
+ auto it = Senders.find(ev->Cookie);
+ if (it != Senders.end()) {
+ if (ev->Get()->Issues.Size() == 0) {
+ Send(it->second, new TEvInternalService::TEvHealthCheckResponse(ev->Get()->Record));
+ } else {
+ auto issues = ev->Get()->Issues;
+ Send(it->second, new TEvInternalService::TEvHealthCheckResponse(NYdb::EStatus::INTERNAL_ERROR, std::move(issues)));
+ }
+ Senders.erase(it);
+ }
+ }
+
+ void Handle(TEvInternalService::TEvGetTaskRequest::TPtr& /*ev*/) {
+ /*
+ Cookie++;
+ Senders[Cookie] = ev->Sender;
+ auto request = ev->Get()->Request;
+ Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvGetTaskRequest(std::move(request)), 0, Cookie);
+ */
+ }
+
+ void Handle(TEvInternalService::TEvPingTaskRequest::TPtr& /*ev*/) {
+ /*
+ Cookie++;
+ Senders[Cookie] = ev->Sender;
+ auto request = ev->Get()->Request;
+ Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvPingTaskRequest(std::move(request)), 0, Cookie);
+ */
+ }
+
+ void Handle(TEvInternalService::TEvWriteResultRequest::TPtr& /*ev*/) {
+ /*
+ Cookie++;
+ Senders[Cookie] = ev->Sender;
+ auto request = ev->Get()->Request;
+ Send(NYq::ControlPlaneStorageServiceActorId(), new NYq::TEvControlPlaneStorage::TEvWriteResultRequest(std::move(request)), 0, Cookie);
+ */
+ }
+
+ const NMonitoring::TDynamicCounterPtr ServiceCounters;
+ ui32 Cookie = 0;
+ THashMap<ui32, NActors::TActorId> Senders;
+};
+
+NActors::IActor* CreateLoopbackServiceActor(
+ const NMonitoring::TDynamicCounterPtr& counters) {
+ return new TLoopbackService(counters);
+}
+
+} /* NYq */
diff --git a/ydb/core/yq/libs/private_client/loopback_service.h b/ydb/core/yq/libs/private_client/loopback_service.h
new file mode 100644
index 00000000000..7bf15de7991
--- /dev/null
+++ b/ydb/core/yq/libs/private_client/loopback_service.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include "events.h"
+
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+namespace NYq {
+
+NActors::IActor* CreateLoopbackServiceActor(
+ const NMonitoring::TDynamicCounterPtr& counters);
+
+} /* NYq */