diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-06-26 19:26:09 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-06-26 19:26:09 +0300 |
commit | bb35bb7376709e02fc037b9c1e93f5a416287950 (patch) | |
tree | c55c258fd959ca04299326b18de88db6711e2961 | |
parent | b90b4affbe58ed6d40eb861d8fcb1eff2801b016 (diff) | |
download | ydb-bb35bb7376709e02fc037b9c1e93f5a416287950.tar.gz |
&CreateConnection & CreateBinding YQv2
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/config.cpp | 14 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/config.h | 10 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp | 685 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h | 12 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/events/events.h | 2 | ||||
-rw-r--r-- | ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp | 28 | ||||
-rw-r--r-- | ydb/core/fq/libs/init/init.cpp | 10 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_fq.cpp | 5 |
8 files changed, 740 insertions, 26 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/config.cpp b/ydb/core/fq/libs/control_plane_proxy/config.cpp index f71aaf7fc0..0a2a63fe29 100644 --- a/ydb/core/fq/libs/control_plane_proxy/config.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/config.cpp @@ -30,12 +30,20 @@ NConfig::TControlPlaneProxyConfig FillDefaultParameters(NConfig::TControlPlanePr } -TControlPlaneProxyConfig::TControlPlaneProxyConfig(const NConfig::TControlPlaneProxyConfig& config) +TControlPlaneProxyConfig::TControlPlaneProxyConfig( + const NConfig::TControlPlaneProxyConfig& config, + const NConfig::TComputeConfig& computeConfig, + const NConfig::TCommonConfig& commonConfig) : Proto(FillDefaultParameters(config)) + , ComputeConfig(computeConfig) + , CommonConfig(commonConfig) , RequestTimeout(GetDuration(Proto.GetRequestTimeout(), TDuration::Seconds(30))) , MetricsTtl(GetDuration(Proto.GetMetricsTtl(), TDuration::Days(1))) - , ConfigRetryPeriod(GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100))) -{ + , ConfigRetryPeriod( + GetDuration(Proto.GetConfigRetryPeriod(), TDuration::MilliSeconds(100))) { } + +bool TControlPlaneProxyConfig::IsYDBComputeEngineEnabled() const { + return ComputeConfig.HasYdb() && ComputeConfig.GetYdb().GetEnable(); } } // NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/config.h b/ydb/core/fq/libs/control_plane_proxy/config.h index bb108628f0..32f09af9e4 100644 --- a/ydb/core/fq/libs/control_plane_proxy/config.h +++ b/ydb/core/fq/libs/control_plane_proxy/config.h @@ -1,5 +1,7 @@ #pragma once +#include "ydb/core/fq/libs/config/protos/common.pb.h" +#include "ydb/core/fq/libs/config/protos/compute.pb.h" #include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h> #include <util/datetime/base.h> @@ -8,11 +10,17 @@ namespace NFq { struct TControlPlaneProxyConfig { NConfig::TControlPlaneProxyConfig Proto; + NConfig::TComputeConfig ComputeConfig; + NConfig::TCommonConfig CommonConfig; TDuration RequestTimeout; TDuration MetricsTtl; TDuration ConfigRetryPeriod; - TControlPlaneProxyConfig(const NConfig::TControlPlaneProxyConfig& config); + TControlPlaneProxyConfig( + const NConfig::TControlPlaneProxyConfig& config, + const NConfig::TComputeConfig& computeConfig, + const NConfig::TCommonConfig& commonConfig); + bool IsYDBComputeEngineEnabled() const; }; } // NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index 088fc97da9..3de9358c71 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -45,6 +45,9 @@ #include <ydb/library/folder_service/folder_service.h> #include <ydb/library/folder_service/events.h> +#include <contrib/libs/fmt/include/fmt/format.h> + +#include <util/string/join.h> namespace NFq { namespace { @@ -321,7 +324,7 @@ public: Counters->Error->Inc(); CPP_LOG_E(errorMessage); NYql::TIssues issues; - NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve subject type error: "); + NYql::TIssue issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Resolve subject type error"); issues.AddIssue(issue); Counters->Error->Inc(); const TDuration delta = TInstant::Now() - StartTime; @@ -364,6 +367,605 @@ private: } }; +TString EncloseAndEscapeString(const TString& value, char enclosingChar) { + auto escapedValue = value; + SubstGlobal( + escapedValue, TString{enclosingChar}, TStringBuilder{} << '\\' << enclosingChar); + return TStringBuilder{} << enclosingChar << escapedValue << enclosingChar; +} + +class TCreateConnectionInYDBActor : + public NActors::TActorBootstrapped<TCreateConnectionInYDBActor> { + struct TEvPrivate { + enum EEv { + EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvCreateConnectionExecutionResponse, + EvEnd + }; + + static_assert( + EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvCreateSessionResponse : + NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { + TAsyncCreateSessionResult Result; + + TEvCreateSessionResponse(TAsyncCreateSessionResult result) + : Result(std::move(result)) { } + }; + + struct TEvCreateConnectionExecutionResponse : + NActors::TEventLocal<TEvCreateConnectionExecutionResponse, EvCreateConnectionExecutionResponse> { + TAsyncStatus Result; + + TEvCreateConnectionExecutionResponse(TAsyncStatus result) + : Result(std::move(result)) { } + }; + }; + + using TBase = NActors::TActorBootstrapped<TCreateConnectionInYDBActor>; + using TBase::Become; + using TBase::PassAway; + using TBase::Register; + using TBase::SelfId; + using TBase::Send; + using IRetryPolicy = + IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>; + using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>; + + using TEventRequest = TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr; + using TResponce = TEvControlPlaneProxy::TEvCreateConnectionResponse; + + TActorId Sender; + TRequestCommonCountersPtr Counters; + TEventRequest Event; + ui32 Cookie; + TDuration RequestTimeout; + TInstant StartTime; + TTableClientPtr TableClient; + TString ObjectStorageEndpoint; + +public: + TCreateConnectionInYDBActor( + const TRequestCommonCountersPtr& counters, + const NConfig::TYdbCompute& ydbComputeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, + const TString& objectStorageEndpoint, + TActorId sender, + TEventRequest event, + ui32 cookie, + TDuration requestTimeout) + : Sender(sender) + , Counters(counters) + , Event(event) + , Cookie(cookie) + , RequestTimeout(requestTimeout) + , StartTime(TInstant::Now()) + , TableClient(CreateNewTableClient( + ydbComputeConfig, yqSharedResources, credentialsProviderFactory)) + , ObjectStorageEndpoint(objectStorageEndpoint) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_CONNECTION_IN_YDB"; + + void Bootstrap() { + CPP_LOG_T("Create connection in YDB. Actor id: " << SelfId()); + + if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) { + NYql::TIssue issue = MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "CreateConnectionRequest is not valid"); + for (auto& subIssue : issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + Become( + &TCreateConnectionInYDBActor::StateFunc, + RequestTimeout, + new NActors::TEvents::TEvWakeup()); + Counters->InFly->Inc(); + InitiateConnectionCreation(); + } + + NYql::TIssues ValidateRequest(const FederatedQuery::CreateConnectionRequest& request) { + NYql::TIssues issues; + if (request.content().name().Contains('/')) { + issues.AddIssue(MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in connection name")); + } + return issues; + } + + void InitiateConnectionCreation() { + auto request = Event->Get()->Request; + TableClient->CreateSession().Subscribe( + [actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncCreateSessionResult& future) { + actorSystem->Send( + self, new TEvPrivate::TEvCreateSessionResponse(std::move(future))); + }); + } + + void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { + using namespace fmt::literals; + auto createSessionResult = event->Get()->Result.GetValueSync(); + if (!createSessionResult.IsSuccess()) { + TString errorMessage = TStringBuilder{} + << "Couldn't create YDB session. Status" + << createSessionResult.GetStatus(); + CPP_LOG_E(errorMessage); + + NYql::TIssue issue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session"); + for (auto& subIssue : createSessionResult.GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + auto request = Event->Get()->Request; + auto session = createSessionResult.GetSession(); + auto bucketName = request.content().setting().object_storage().bucket(); + SubstGlobal(bucketName, TString{'"'}, "\\\""); + session + .ExecuteSchemeQuery(fmt::format( + R"( + CREATE EXTERNAL DATA SOURCE {external_source} WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + )", + "external_source"_a = EncloseAndEscapeString(request.content().name(), '`'), + "location"_a = ObjectStorageEndpoint + "/" + bucketName + "/")) + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncStatus& future) { + actorSystem->Send( + self, + new TEvPrivate::TEvCreateConnectionExecutionResponse(std::move(future))); + }); + } + + void Handle(TEvPrivate::TEvCreateConnectionExecutionResponse::TPtr& event) { + Counters->InFly->Dec(); + Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + + const auto& status = event->Get()->Result.GetValueSync(); + if (!status.IsSuccess()) { + TString errorMessage; + if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + errorMessage = "External data source with such name already exists"; + } else { + errorMessage = TStringBuilder{} + << "Couldn't create external data source in YDB. Status" + << status.GetStatus(); + } + + CPP_LOG_E(errorMessage); + + NYql::TIssue issue = MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "Couldn't create external data source in YDB"); + for (auto& subIssue : status.GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + CPP_LOG_T("External data source in YDB was successfully created"); + Counters->Ok->Inc(); + Event->Get()->ComputeYDBOperationWasPerformed = true; + + TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); + PassAway(); + } + + void PassAway() override { TActor::PassAway(); } + + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); + hFunc(TEvPrivate::TEvCreateConnectionExecutionResponse, Handle);) + + void HandleTimeout() { + CPP_LOG_D( + "Timeout occurred while creating external data source in YDB. Actor id: " + << SelfId()); + Counters->Timeout->Inc(); + SendErrorMessageToSender(MakeErrorIssue( + TIssuesIds::TIMEOUT, + "Timeout occurred while creating external data source in YDB. Try repeating the request later")); + } + + void SendErrorMessageToSender(NYql::TIssue issue) { + Counters->Error->Inc(); + NYql::TIssues issues; + issues.AddIssue(issue); + Send( + Sender, + new TEvControlPlaneProxy::TEvCreateConnectionResponse(issues, {}), + 0, + Cookie); // Change to template + PassAway(); + } + +private: + static TTableClientPtr CreateNewTableClient( + const NConfig::TYdbCompute& ydbComputeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>( + ydbComputeConfig.connection(), credentialsProviderFactory); + return std::make_unique<NYdb::NTable::TTableClient>( + yqSharedResources->UserSpaceYdbDriver, tableSettigns); + } +}; + +class TCreateBindingInYDBActor : + public NActors::TActorBootstrapped<TCreateBindingInYDBActor> { + struct TEvPrivate { + enum EEv { + EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvCreateBindingExecutionResponse, + EvEnd + }; + + static_assert( + EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvCreateSessionResponse : + NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { + TAsyncCreateSessionResult Result; + + TEvCreateSessionResponse(TAsyncCreateSessionResult result) + : Result(std::move(result)) { } + }; + + struct TEvCreateBindingExecutionResponse : + NActors::TEventLocal<TEvCreateBindingExecutionResponse, EvCreateBindingExecutionResponse> { + TAsyncStatus Result; + + TEvCreateBindingExecutionResponse(TAsyncStatus result) + : Result(std::move(result)) { } + }; + }; + + using TBase = NActors::TActorBootstrapped<TCreateBindingInYDBActor>; + using TBase::Become; + using TBase::PassAway; + using TBase::Register; + using TBase::SelfId; + using TBase::Send; + using IRetryPolicy = + IRetryPolicy<NCloud::TEvAccessService::TEvAuthenticateResponse::TPtr&>; + using TTableClientPtr = std::unique_ptr<NYdb::NTable::TTableClient>; + + using TEventRequest = TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr; + using TResponce = TEvControlPlaneProxy::TEvCreateBindingResponse; + using TResponseProxy = TEvControlPlaneProxy::TEvCreateBindingResponse; + + TActorId Sender; + TRequestCommonCountersPtr Counters; + TEventRequest Event; + ui32 Cookie; + TInstant StartTime; + TPermissions Permissions; + TDuration RequestTimeout; + TTableClientPtr TableClient; + TString ConnectionName; + +public: + TCreateBindingInYDBActor( + const TRequestCommonCountersPtr& counters, + const NConfig::TYdbCompute& ydbComputeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, + TActorId sender, + TEventRequest event, + ui32 cookie, + TPermissions permissions, + TDuration requestTimeout) + : Sender(sender) + , Counters(counters) + , Event(event) + , Cookie(cookie) + , StartTime(TInstant::Now()) + , Permissions(std::move(permissions)) + , RequestTimeout(requestTimeout) + , TableClient(CreateNewTableClient( + ydbComputeConfig, yqSharedResources, credentialsProviderFactory)) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_CREATE_BINDING_IN_YDB"; + + void Bootstrap() { + CPP_LOG_T("Create external table in YDB. Actor id: " << SelfId()); + + if (auto issues = ValidateRequest(Event->Get()->Request); !issues.Empty()) { + NYql::TIssue issue = MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "CreateBindingRequest is not valid"); + for (auto& subIssue : issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + Become( + &TCreateBindingInYDBActor::StateFunc, + RequestTimeout, + new NActors::TEvents::TEvWakeup()); + Counters->InFly->Inc(); + ResolveConnectionId(); + } + + void ResolveConnectionId() { + FederatedQuery::DescribeConnectionRequest request; + auto connectionId = Event->Get()->Request.content().connection_id(); + request.set_connection_id(connectionId); + CPP_LOG_T( + "Create external table in YDB. Resolving connection id. Actor id: " + << SelfId() << " connection_id: " << connectionId); + auto event = new TEvControlPlaneStorage::TEvDescribeConnectionRequest( + "yandexcloud://" + Event->Get()->FolderId, + request, + Event->Get()->User, + Event->Get()->Token, + Event->Get()->CloudId, + Permissions, + Event->Get()->Quotas, + Event->Get()->TenantInfo); + Send(ControlPlaneStorageServiceActorId(), event); + } + + void Handle(TEvControlPlaneStorage::TEvDescribeConnectionResponse::TPtr& event) { + const auto& issues = event->Get()->Issues; + if (!issues.Empty()) { + CPP_LOG_E( + "Couldn't resolve connection id. Actor id: " << SelfId() << " Status: " + << issues.ToOneLineString()); + + NYql::TIssue issue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't resolve connection id"); + for (auto& subIssue : issues) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + ConnectionName = event->Get()->Result.connection().content().name(); + CPP_LOG_T( + "Create external table in YDB. Resolved connection name. Actor id: " + << SelfId() << " Connection name: " << ConnectionName); + InitiateConnectionCreation(); + } + + void InitiateConnectionCreation() { + auto request = Event->Get()->Request; + TableClient->CreateSession().Subscribe( + [actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncCreateSessionResult& future) { + actorSystem->Send( + self, new TEvPrivate::TEvCreateSessionResponse(std::move(future))); + }); + } + + void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { + auto createSessionResult = event->Get()->Result.GetValueSync(); + if (!createSessionResult.IsSuccess()) { + CPP_LOG_E( + "Couldn't create YDB session. Actor id: " + << SelfId() << " Status: " << createSessionResult.GetStatus()); + + NYql::TIssue issue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't create YDB session"); + for (auto& subIssue : createSessionResult.GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + auto session = createSessionResult.GetSession(); + auto query = CreateSchemaQuery(Event->Get()->Request); + CPP_LOG_T( + "Create external table in YDB. Actor id: " << SelfId() << " Query: " << query); + session.ExecuteSchemeQuery(query).Subscribe( + [actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncStatus& future) { + actorSystem->Send( + self, new TEvPrivate::TEvCreateBindingExecutionResponse(future)); + }); + } + + NYql::TIssues ValidateRequest(const FederatedQuery::CreateBindingRequest& request) { + NYql::TIssues issues; + if (request.content().setting().binding_case() != + FederatedQuery::BindingSetting::BindingCase::kObjectStorage) { + issues.AddIssue( + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Unsupported binding type")); + } + if (request.content().setting().object_storage().subset().size() != 1) { + issues.AddIssue(MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, + "Cannot create external table due to wrong amount of subsets in request")); + } + if (request.content().name().Contains('/')) { + issues.AddIssue(MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "'/' is not allowed in binding name")); + } + return issues; + } + + TString CreateSchemaQuery(const FederatedQuery::CreateBindingRequest& request) { + using namespace fmt::literals; + + auto bindingName = request.content().name(); + auto objectStorageParams = request.content().setting().object_storage(); + const auto& subset = objectStorageParams.subset(0); + + // Schema + auto columnsTransformFunction = [](const Ydb::Column& column) -> TString { + return fmt::format( + " {columnName} {columnType}", + "columnName"_a = EncloseAndEscapeString(column.name(), '`'), + "columnType"_a = + NYdb::TType{column.type()} + .ToString()); // TODO: check if this conversion could lead to valnurability + }; + auto columnsBegin = + MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction); + auto columnsEnd = + MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction); + + // WithOptiuons + auto withOptions = std::unordered_map<TString, TString>{}; + withOptions.insert( + {"DATA_SOURCE", TStringBuilder{} << '"' << ConnectionName << '"'}); + withOptions.insert({"LOCATION", EncloseAndEscapeString(subset.path_pattern(), '"')}); + if (!subset.format().Empty()) { + withOptions.insert({"FORMAT", EncloseAndEscapeString(subset.format(), '"')}); + } + if (!subset.compression().Empty()) { + withOptions.insert( + {"COMPRESSION", EncloseAndEscapeString(subset.compression(), '"')}); + } + for (auto& kv : subset.format_setting()) { + withOptions.insert( + {EncloseAndEscapeString(kv.first, '"'), + EncloseAndEscapeString(kv.second, '"')}); + }; + + if (!subset.partitioned_by().empty()) { + auto stringEscapeMapper = [](const TString& value) { + return EncloseAndEscapeString(value, '"'); + }; + + auto partitionBy = + TStringBuilder{} + << '[' + << JoinRange( + ", ", + MakeMappedIterator(subset.partitioned_by().begin(), stringEscapeMapper), + MakeMappedIterator(subset.partitioned_by().end(), stringEscapeMapper)) + << ']'; + withOptions.insert({"PARTITIONED_BY", partitionBy}); + } + + if (!subset.projection().empty()) { + auto keyValueToStringMapper = + [](const std::pair<TString, TString>& kv) -> TString { + return fmt::format( + " {propertyName}={propertyValue}", + "propertyName"_a = EncloseAndEscapeString(kv.first, '"'), + "propertyValue"_a = EncloseAndEscapeString(kv.second, '"')); + }; + + auto projection = + TStringBuilder{} + << "{\n" + << JoinRange( + ",\n", + MakeMappedIterator(subset.projection().begin(), keyValueToStringMapper), + MakeMappedIterator(subset.projection().end(), keyValueToStringMapper)) + << "\n}"; + withOptions.insert({"PROJECTION", projection}); + } + + auto concatEscapedKeyValueMapper = + [](const std::pair<TString, TString>& kv) -> TString { + return TStringBuilder{} << " " << kv.first << " = " << kv.second; + }; + + auto withOptionsBegin = + MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper); + auto withOptionsEnd = + MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper); + + return fmt::format( + R"( + CREATE EXTERNAL TABLE {externalTableName} ( + {columns} + ) WITH ( + {withOptions} + );)", + "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'), + "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd), + "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd)); + } + + void Handle(TEvPrivate::TEvCreateBindingExecutionResponse::TPtr& event) { + Counters->InFly->Dec(); + Counters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + + const auto& status = event->Get()->Result.GetValueSync(); + if (!status.IsSuccess()) { + TString errorMessage; + if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + errorMessage = "External table with such name already exists"; + } else { + errorMessage = TStringBuilder{} + << "Couldn't create external table in YDB. Status: " + << status.GetStatus(); + } + + CPP_LOG_E(errorMessage); + + NYql::TIssue issue = MakeErrorIssue( + TIssuesIds::INTERNAL_ERROR, "Couldn't create external table in YDB"); + for (auto& subIssue : status.GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendErrorMessageToSender(std::move(issue)); + return; + } + + Counters->Ok->Inc(); + Event->Get()->ComputeYDBOperationWasPerformed = true; + CPP_LOG_T("External table in YDB was successfully created"); + + TActivationContext::Send(Event->Forward(ControlPlaneProxyActorId())); + PassAway(); + } + + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(TEvControlPlaneStorage::TEvDescribeConnectionResponse, Handle); + hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); + hFunc(TEvPrivate::TEvCreateBindingExecutionResponse, Handle);) + + void HandleTimeout() { + CPP_LOG_D("Create external table in YDB timeout. Actor id: " << SelfId()); + Counters->Timeout->Inc(); + SendErrorMessageToSender(MakeErrorIssue( + TIssuesIds::TIMEOUT, + "Create external table in YDB timeout. Try repeating the request later")); + } + + void SendErrorMessageToSender(NYql::TIssue issue) { + Counters->Error->Inc(); + NYql::TIssues issues; + issues.AddIssue(issue); + Send(Sender, new TResponseProxy(issues, {}), 0, + Cookie); // Change to template + PassAway(); + } + +private: + static TTableClientPtr CreateNewTableClient( + const NConfig::TYdbCompute& ydbComputeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + auto tableSettigns = GetClientSettings<NYdb::NTable::TClientSettings>( + ydbComputeConfig.connection(), credentialsProviderFactory); + return std::make_unique<NYdb::NTable::TTableClient>( + yqSharedResources->UserSpaceYdbDriver, tableSettigns); + } +}; + template<class TEventRequest, class TResponseProxy> class TResolveFolderActor : public NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>> { using TBase = NActors::TActorBootstrapped<TResolveFolderActor<TEventRequest, TResponseProxy>>; @@ -762,6 +1364,8 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane RTC_MODIFY_BINDING, RTC_DELETE_BINDING, RTC_RESOLVE_SUBJECT_TYPE, + RTC_CREATE_CONNECTION_IN_YDB, + RTC_CREATE_BINDING_IN_YDB, RTC_MAX, }; @@ -809,6 +1413,8 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane { MakeIntrusive<TRequestCommonCounters>("ModifyBinding") }, { MakeIntrusive<TRequestCommonCounters>("DeleteBinding") }, { MakeIntrusive<TRequestCommonCounters>("ResolveSubjectType") }, + { MakeIntrusive<TRequestCommonCounters>("CreateConnectionInYDB") }, + { MakeIntrusive<TRequestCommonCounters>("CreateBindingInYDB") } }); TTtlCache<TMetricsScope, TScopeCountersPtr, TMap> ScopeCounters{TTtlCacheSettings{}.SetTtl(TDuration::Days(1))}; @@ -878,16 +1484,25 @@ class TControlPlaneProxyActor : public NActors::TActorBootstrapped<TControlPlane TCounters Counters; const ::NFq::TControlPlaneProxyConfig Config; + const TYqSharedResources::TPtr YqSharedResources; + const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; const bool QuotaManagerEnabled; TActorId AccessService; public: - TControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters, bool quotaManagerEnabled) + TControlPlaneProxyActor( + const NConfig::TControlPlaneProxyConfig& config, + const NConfig::TComputeConfig& computeConfig, + const NConfig::TCommonConfig& commonConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, + const ::NMonitoring::TDynamicCounterPtr& counters, + bool quotaManagerEnabled) : Counters(counters) - , Config(config) - , QuotaManagerEnabled(quotaManagerEnabled) - { - } + , Config(config, computeConfig, commonConfig) + , YqSharedResources(yqSharedResources) + , CredentialsProviderFactory(credentialsProviderFactory) + , QuotaManagerEnabled(quotaManagerEnabled) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY"; @@ -1659,6 +2274,7 @@ private: const TString cloudId = ev->Get()->CloudId; const TString folderId = ev->Get()->FolderId; const TString subjectType = ev->Get()->SubjectType; + const bool ydbOperationWasPerformed = ev->Get()->ComputeYDBOperationWasPerformed; const TString scope = "yandexcloud://" + folderId; TString user = ev->Get()->User; TString token = ev->Get()->Token; @@ -1709,6 +2325,20 @@ private: TPermissions::TPermission::MANAGE_PUBLIC }; + if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) { + Register(new TCreateConnectionInYDBActor( + Counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), + Config.ComputeConfig.GetYdb(), + YqSharedResources, + CredentialsProviderFactory, + Config.CommonConfig.GetObjectStorageEndpoint(), + sender, + ev, + cookie, + Config.RequestTimeout)); + return; + } + Register(new TRequestActor<FederatedQuery::CreateConnectionRequest, TEvControlPlaneStorage::TEvCreateConnectionRequest, TEvControlPlaneStorage::TEvCreateConnectionResponse, @@ -2059,6 +2689,7 @@ private: const TString cloudId = ev->Get()->CloudId; const TString folderId = ev->Get()->FolderId; const TString subjectType = ev->Get()->SubjectType; + const bool ydbOperationWasPerformed = ev->Get()->ComputeYDBOperationWasPerformed; const TString scope = "yandexcloud://" + folderId; TString user = ev->Get()->User; TString token = ev->Get()->Token; @@ -2080,7 +2711,10 @@ private: } TRequestCounters requestCounters = Counters.GetCounters(cloudId, scope, RTS_CREATE_BINDING, RTC_CREATE_BINDING); - NYql::TIssues issues = ValidatePermissions(ev, {"yq.bindings.create@as"}); + + auto requiredParams = TVector<TString>{"yq.bindings.create@as", "yq.connections.get@as"}; + + NYql::TIssues issues = ValidatePermissions(ev, requiredParams); if (issues) { CPS_LOG_E("CreateBindingRequest, validation failed: " << scope << " " << user << " " << NKikimr::MaskTicket(token) << " " << request.DebugString() << " error: " << issues.ToString()); Send(ev->Sender, new TEvControlPlaneProxy::TEvCreateBindingResponse(issues, subjectType), 0, ev->Cookie); @@ -2101,9 +2735,26 @@ private: } static const TPermissions availablePermissions { - TPermissions::TPermission::MANAGE_PUBLIC + TPermissions::TPermission::VIEW_PUBLIC + | TPermissions::TPermission::MANAGE_PUBLIC + | TPermissions::TPermission::MANAGE_PRIVATE }; + if (Config.IsYDBComputeEngineEnabled() && !ydbOperationWasPerformed) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(new TCreateBindingInYDBActor( + Counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB), + Config.ComputeConfig.GetYdb(), + YqSharedResources, + CredentialsProviderFactory, + sender, + ev, + cookie, + std::move(permissions), + Config.RequestTimeout)); + return; + } + Register(new TRequestActor<FederatedQuery::CreateBindingRequest, TEvControlPlaneStorage::TEvCreateBindingRequest, TEvControlPlaneStorage::TEvCreateBindingResponse, @@ -2398,8 +3049,22 @@ TActorId ControlPlaneProxyActorId() { return NActors::TActorId(0, name); } -IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters, bool quotaManagerEnabled) { - return new TControlPlaneProxyActor(config, counters, quotaManagerEnabled); +IActor* CreateControlPlaneProxyActor( + const NConfig::TControlPlaneProxyConfig& config, + const NConfig::TComputeConfig& computeConfig, + const NConfig::TCommonConfig& commonConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, + const ::NMonitoring::TDynamicCounterPtr& counters, + bool quotaManagerEnabled) { + return new TControlPlaneProxyActor( + config, + computeConfig, + commonConfig, + yqSharedResources, + credentialsProviderFactory, + counters, + quotaManagerEnabled); } } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h index 5ceaa799a5..128f97aa9f 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h @@ -1,7 +1,10 @@ #pragma once #include <ydb/core/fq/libs/actors/logging/log.h> +#include "ydb/core/fq/libs/config/protos/compute.pb.h" #include <ydb/core/fq/libs/config/protos/control_plane_proxy.pb.h> +#include <ydb/library/security/ydb_credentials_provider_factory.h> +#include <ydb/core/fq/libs/shared_resources/shared_resources.h> #include <library/cpp/actors/core/actor.h> #include <library/cpp/monlib/dynamic_counters/counters.h> @@ -22,6 +25,13 @@ namespace NFq { NActors::TActorId ControlPlaneProxyActorId(); -NActors::IActor* CreateControlPlaneProxyActor(const NConfig::TControlPlaneProxyConfig& config, const ::NMonitoring::TDynamicCounterPtr& counters, bool quotaManagerEnabled); +NActors::IActor* CreateControlPlaneProxyActor( + const NConfig::TControlPlaneProxyConfig& config, + const NConfig::TComputeConfig& computeConfig, + const NConfig::TCommonConfig& commonConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, + const ::NMonitoring::TDynamicCounterPtr& counters, + bool quotaManagerEnabled); } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index 3cface33cd..e01117000d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -79,6 +79,7 @@ struct TEvControlPlaneProxy { , Permissions(permissions) , Quotas(std::move(quotas)) , TenantInfo(tenantInfo) + , ComputeYDBOperationWasPerformed(false) { } @@ -91,6 +92,7 @@ struct TEvControlPlaneProxy { TMaybe<TQuotaMap> Quotas; TTenantInfo::TPtr TenantInfo; TString SubjectType; + bool ComputeYDBOperationWasPerformed; }; template<typename TDerived, typename ProtoMessage, ui32 EventType> diff --git a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp index 225f1f73f0..031a75d99c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/ut/control_plane_proxy_ut.cpp @@ -89,6 +89,8 @@ public: struct TTestBootstrap { const TDuration RequestTimeout = TDuration::Seconds(10); NConfig::TControlPlaneProxyConfig Config; + NConfig::TComputeConfig ComputeConfig; + NConfig::TCommonConfig CommonConfig; TRuntimePtr Runtime; TGrabActor* MetaStorageGrab; @@ -379,7 +381,14 @@ private: TRuntimePtr runtime(new TTestBasicRuntime()); runtime->SetLogPriority(NKikimrServices::STREAMS_CONTROL_PLANE_SERVICE, NLog::PRI_DEBUG); - auto controlPlaneProxy = CreateControlPlaneProxyActor(Config, MakeIntrusive<::NMonitoring::TDynamicCounters>(), true); + auto controlPlaneProxy = CreateControlPlaneProxyActor( + Config, + ComputeConfig, + CommonConfig, + NFq::TYqSharedResources::TPtr{}, + NKikimr::TYdbCredentialsProviderFactory(nullptr), + MakeIntrusive<::NMonitoring::TDynamicCounters>(), + true); runtime->AddLocalService( ControlPlaneProxyActorId(), TActorSetupCmd(controlPlaneProxy, TMailboxType::Simple, 0)); @@ -1332,7 +1341,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckPermissionsSuccess) { NConfig::TControlPlaneProxyConfig config; config.SetEnablePermissions(true); TTestBootstrap bootstrap(config); - bootstrap.SendCreateBindingRequest({"yq.bindings.create@as"}); + bootstrap.SendCreateBindingRequest({"yq.bindings.create@as", "yq.connections.get@as"}); auto request = bootstrap.MetaStorageGrab->GetRequest(); auto event = request->Get<TEvControlPlaneStorage::TEvCreateBindingRequest>(); auto permissions = event->Permissions; @@ -1896,6 +1905,9 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckPermissionsControlPlaneStorageSuccess) config.SetEnablePermissions(true); TTestBootstrap bootstrap(config); bootstrap.SendCreateBindingRequest({ + "yq.connections.get@as", + "yq.resources.viewPublic@as", + "yq.resources.viewPrivate@as", "yq.bindings.create@as", "yq.resources.managePublic@as" }); @@ -1903,7 +1915,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckPermissionsControlPlaneStorageSuccess) auto event = request->Get<TEvControlPlaneStorage::TEvCreateBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); @@ -2691,11 +2703,11 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyCheckNegativePermissionsSuccess) { auto event = request->Get<TEvControlPlaneStorage::TEvCreateBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::MANAGE_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::CONNECTIONS_USE)); UNIT_ASSERT(!permissions.Check(TPermissions::BINDINGS_USE)); UNIT_ASSERT(!permissions.Check(TPermissions::QUERY_INVOKE)); @@ -3157,11 +3169,11 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvCreateBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); - UNIT_ASSERT(!permissions.Check(TPermissions::MANAGE_PRIVATE)); + UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::CONNECTIONS_USE)); UNIT_ASSERT(!permissions.Check(TPermissions::BINDINGS_USE)); UNIT_ASSERT(!permissions.Check(TPermissions::QUERY_INVOKE)); @@ -3609,7 +3621,7 @@ Y_UNIT_TEST_SUITE(TControlPlaneProxyShouldPassHids) { auto event = request->Get<TEvControlPlaneStorage::TEvCreateBindingRequest>(); auto permissions = event->Permissions; UNIT_ASSERT_VALUES_EQUAL(event->Scope, "yandexcloud://my_folder"); - UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PUBLIC)); + UNIT_ASSERT(permissions.Check(TPermissions::VIEW_PUBLIC)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_PRIVATE)); UNIT_ASSERT(!permissions.Check(TPermissions::VIEW_AST)); UNIT_ASSERT(permissions.Check(TPermissions::MANAGE_PUBLIC)); diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 1d730bfc19..20a6111275 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -96,8 +96,14 @@ void Init( } if (protoConfig.GetControlPlaneProxy().GetEnabled()) { - auto controlPlaneProxy = NFq::CreateControlPlaneProxyActor(protoConfig.GetControlPlaneProxy(), - yqCounters->GetSubgroup("subsystem", "ControlPlaneProxy"), protoConfig.GetQuotasManager().GetEnabled()); + auto controlPlaneProxy = NFq::CreateControlPlaneProxyActor( + protoConfig.GetControlPlaneProxy(), + protoConfig.GetCompute(), + protoConfig.GetCommon(), + yqSharedResources, + NKikimr::CreateYdbCredentialsProviderFactory, + yqCounters->GetSubgroup("subsystem", "ControlPlaneProxy"), + protoConfig.GetQuotasManager().GetEnabled()); actorRegistrator(NFq::ControlPlaneProxyActorId(), controlPlaneProxy); } diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp index 63dbe33277..200347e2a6 100644 --- a/ydb/core/grpc_services/rpc_fq.cpp +++ b/ydb/core/grpc_services/rpc_fq.cpp @@ -590,7 +590,10 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryCreateBindingRequestOp // so yq.resources.managePublic is always requested as optional return { NPerms::Required("yq.bindings.create"), - NPerms::Optional("yq.resources.managePublic") + NPerms::Required("yq.connections.get"), + NPerms::Optional("yq.resources.managePublic"), + NPerms::Optional("yq.resources.viewPublic"), + NPerms::Optional("yq.resources.viewPrivate") }; } }; |