diff options
author | hrustyashko <hrustyashko@yandex-team.ru> | 2022-03-28 17:10:15 +0300 |
---|---|---|
committer | hrustyashko <hrustyashko@yandex-team.ru> | 2022-03-28 17:10:15 +0300 |
commit | 1a84b3ff343ca0a6364fcc54ea40aee919354f43 (patch) | |
tree | 413cb5dbe29841f9860786bb9529b7bf4d054237 | |
parent | 3e9fdacb8297268fdf38f64d3bf4065f48a2d4d7 (diff) | |
download | ydb-1a84b3ff343ca0a6364fcc54ea40aee919354f43.tar.gz |
Получать IAM токен для трансформации
ref:6a690395ca2ca76ee82b4e4929559fa26278d834
8 files changed, 104 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h index 2b57e288ff..7912149f05 100644 --- a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h +++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h @@ -23,6 +23,8 @@ public: const NKikimr::NMiniKQL::THolderFactory& HolderFactory; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; + + const THashMap<TString, TString>& SecureParams; }; TDqTransformActorFactory(); diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index 36d73316fe..8e6c760ec3 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -137,7 +137,7 @@ enum ETransformType { message TDqTransform { ETransformType Type = 1; string FunctionName = 2; - string ConnectionName = 3; + optional string ConnectionName = 3; // binary YSON(codec ydb/library/yql/providers/common/schema/mkql) string InputType = 4; diff --git a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt index 61a6a9637f..fc94136494 100644 --- a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt +++ b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt @@ -26,6 +26,8 @@ target_link_libraries(providers-cloud_function-actors PUBLIC yql-minikql-computation yql-public-issue yql-sql-pg_dummy + common-token_accessor-client + client-ydb_types-credentials ) target_sources(providers-cloud_function-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp index dc4534c9db..2c008cf057 100644 --- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp @@ -55,6 +55,7 @@ const auto RETRY_POLICY = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryB TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner, NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway, + NYdb::TCredentialsProviderPtr credentialsProvider, IDqOutputChannel::TPtr transformInput, IDqOutputConsumer::TPtr taskOutput, const NKikimr::NMiniKQL::THolderFactory& holderFactory, @@ -64,6 +65,7 @@ TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner, , Owner(owner) , Transform(transform) , Gateway(std::move(gateway)) +, CredentialsProvider(std::move(credentialsProvider)) , TransformInput(std::move(transformInput)) , TaskOutput(std::move(taskOutput)) , TransformName(UrlEscapeRet(transform.GetFunctionName(), true)) @@ -147,7 +149,10 @@ void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvExecuteTransform TransformInProgress = true; auto* actorSystem = TActivationContext::ActorSystem(); - auto headers = IHTTPGateway::THeaders(); + const auto iamToken = CredentialsProvider->GetAuthInfo(); + auto headers = iamToken.empty() + ? IHTTPGateway::THeaders() + : IHTTPGateway::THeaders{TString("Authorization: ") + iamToken}; Gateway->Download(CLOUD_FUNCTION_BASE_URL + TransformName, headers, 10, std::bind(&TCloudFunctionTransformActor::OnInvokeFinished, actorSystem, SelfId(), context, @@ -305,9 +310,22 @@ TGuard<NKikimr::NMiniKQL::TScopedAlloc> TCloudFunctionTransformActor::BindAlloca return TypeEnv.BindAllocator(); } -std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args) { +std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor( + const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + TDqTransformActorFactory::TArguments&& args) { + + std::shared_ptr<NYdb::ICredentialsProviderFactory> credProviderFactory; + if (transform.HasConnectionName()) { + const auto token = args.SecureParams.Value(transform.GetConnectionName(), TString{}); + credProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, true); + } else { + credProviderFactory = NYdb::CreateInsecureCredentialsProviderFactory(); + } + const auto actor = new TCloudFunctionTransformActor( - args.ComputeActorId, transform, gateway, + args.ComputeActorId, transform, + gateway, credProviderFactory->CreateProvider(), args.TransformInput, args.TransformOutput, args.HolderFactory, args.TypeEnv, args.ProgramBuilder); diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h index 85c9b9f073..a8402c7264 100644 --- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h @@ -9,8 +9,10 @@ #include <ydb/library/yql/dq/runtime/dq_input_channel.h> #include <ydb/library/yql/dq/runtime/dq_output_channel.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -54,7 +56,9 @@ public: static constexpr char ActorName[] = "YQL_DQ_CLOUD_FUNC_TRANSFORM"; TCloudFunctionTransformActor(TActorId owner, - NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway, + NDqProto::TDqTransform transform, + IHTTPGateway::TPtr gateway, + NYdb::TCredentialsProviderPtr credentialsProvider, IDqOutputChannel::TPtr transformInput, IDqOutputConsumer::TPtr taskOutput, const NKikimr::NMiniKQL::THolderFactory& holderFactory, @@ -102,6 +106,8 @@ private: NDqProto::TDqTransform Transform; IHTTPGateway::TPtr Gateway; + NYdb::TCredentialsProviderPtr CredentialsProvider; + IDqOutputChannel::TPtr TransformInput; IDqOutputConsumer::TPtr TaskOutput; @@ -117,6 +123,9 @@ private: NMiniKQL::TType* OutputRowsType = nullptr; }; -std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args); +std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor( + const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, + TDqTransformActorFactory::TArguments&& args); }
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp index ab4d33e308..9e16301cfc 100644 --- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp @@ -5,10 +5,11 @@ namespace NYql::NDq { -void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway) { +void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) { constexpr NDqProto::ETransformType type = NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION; - factory.Register(type, [gateway](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) { - return CreateCloudFunctionTransformActor(transform, gateway, std::move(args)); + factory.Register(type, [gateway, credentialsFactory](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) { + return CreateCloudFunctionTransformActor(transform, gateway, credentialsFactory, std::move(args)); }); } diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h index d17f556c73..681fd1cde1 100644 --- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h @@ -2,9 +2,11 @@ #include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h> #include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> namespace NYql::NDq { -void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway); +void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway, + ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory); }
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp index df3dfe2a9b..863bdba592 100644 --- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp @@ -10,6 +10,7 @@ #include <ydb/library/yql/minikql/computation/mkql_value_builder.h> #include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> #include <ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h> +#include <ydb/library/yql/providers/common/token_accessor/client/factory.h> #include <thread> #include <chrono> @@ -27,9 +28,9 @@ using namespace std::chrono_literals; struct TTransformSystem { IDqOutputChannel::TPtr InputChannel; IDqOutputChannel::TPtr OutputChannel; - TCloudFunctionTransformActor* TransformActor; NActors::TActorId TransformActorId; NActors::TActorId ComputeActorId; + IHTTPMockGateway::TPtr Gateway; }; class TCloudFunctionTransformTest: public TTestBase { @@ -38,6 +39,7 @@ public: UNIT_TEST(TestEmptyChannel) UNIT_TEST(TestMultiplicationTransform) UNIT_TEST(TestSeveralIterations) + UNIT_TEST(TestPrivateFunction) UNIT_TEST_SUITE_END(); TCloudFunctionTransformTest() @@ -46,19 +48,24 @@ public: , FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry())) , ProgramBuilder(*TypeEnv, *FunctionRegistry) , MemInfo("Mem") - , Gateway(IHTTPMockGateway::Make()) { HolderFactory = std::make_unique<NMiniKQL::THolderFactory>(Alloc->Ref(), MemInfo); ValueBuilder = std::make_unique<TDefaultValueBuilder>(*HolderFactory); } - TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName) { + TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName, + TString connectionName = {}, THashMap<TString, TString> secureParams = {}) { auto system = TTransformSystem(); + system.Gateway = IHTTPMockGateway::Make(); + NDqProto::TDqTransform taskTransform; taskTransform.SetType(NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION); taskTransform.SetFunctionName(transformName); + if (!connectionName.empty()) { + taskTransform.SetConnectionName(connectionName); + } TStructMember inMembers[] = { {"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)}, @@ -89,12 +96,22 @@ public: system.OutputChannel = CreateDqOutputChannel(nodeId + 400, outputType, *TypeEnv, *HolderFactory, settings, logger); auto outputConsumer = CreateOutputMapConsumer(system.OutputChannel); + auto credentialsFactory = CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory("", true, ""); system.ComputeActorId = ActorRuntime_->AllocateEdgeActor(); - system.TransformActor = new TCloudFunctionTransformActor(system.ComputeActorId, - taskTransform, Gateway, - system.InputChannel, outputConsumer, - *HolderFactory, *TypeEnv, ProgramBuilder); - system.TransformActorId = ActorRuntime_->Register(system.TransformActor, nodeId); + auto args = TDqTransformActorFactory::TArguments{ + .ComputeActorId = system.ComputeActorId, + .TransformInput = system.InputChannel, + .TransformOutput = outputConsumer, + .HolderFactory = *HolderFactory, + .TypeEnv = *TypeEnv, + .ProgramBuilder = ProgramBuilder, + .SecureParams = secureParams + }; + + NActors::IActor* transformActor; + std::tie(std::ignore, transformActor) = CreateCloudFunctionTransformActor(taskTransform, system.Gateway, + credentialsFactory, std::move(args)); + system.TransformActorId = ActorRuntime_->Register(transformActor, nodeId); return system; } @@ -133,7 +150,7 @@ public: auto url = "https://functions.yandexcloud.net/multiplyC"; auto headers = IHTTPGateway::THeaders(); auto postBody = "[{\"A\":\"true\",\"C\":\"555\"}]"; - Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse); + context.Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse); NUdf::TUnboxedValue* items; auto value = ValueBuilder->NewArray(2, items); @@ -169,9 +186,9 @@ public: auto url = "https://functions.yandexcloud.net/multiplyCBatch"; auto headers = IHTTPGateway::THeaders(); auto postBody1 = "[{\"A\":\"true\",\"C\":\"430\"}]"; - Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1); + context.Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1); auto postBody2 = "[{\"A\":\"false\",\"C\":\"800\"}]"; - Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2); + context.Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2); NUdf::TUnboxedValue* items1; auto value1 = ValueBuilder->NewArray(2, items1); @@ -203,6 +220,38 @@ public: transformedRows.clear(); } + void TestPrivateFunction() { + const auto token = "{\"token\":\"yVVeGG6W0ccToZw\"}"; + const THashMap<TString, TString> secureParams = {{"private_cf_connection", token}}; + auto context = StartUpTransformSystem(0, "multiplyPrivate", "private_cf_connection", secureParams); + + IHTTPMockGateway::TDataResponse multiplyPrivateResponse = []() { + return IHTTPGateway::TContent("[{\"A\":true,\"C\":360}]", 200); + }; + + auto url = "https://functions.yandexcloud.net/multiplyPrivate"; + auto headers = IHTTPGateway::THeaders{"Authorization: Bearer yVVeGG6W0ccToZw"}; + auto postBody = "[{\"A\":\"true\",\"C\":\"180\"}]"; + context.Gateway->AddDownloadResponse(url, headers, postBody, multiplyPrivateResponse); + + NUdf::TUnboxedValue* items; + auto value = ValueBuilder->NewArray(2, items); + items[0] = NUdf::TUnboxedValuePod(true); + items[1] = NUdf::TUnboxedValuePod(ui32(180)); + + context.InputChannel->Push(std::move(value)); + context.InputChannel->Finish(); + + auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform(); + ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv)); + auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1)); + + UNIT_ASSERT(event); + NKikimr::NMiniKQL::TUnboxedValueVector transformedRows; + context.OutputChannel->PopAll(transformedRows); + transformedRows.clear(); + } + private: THolder<NActors::TTestActorRuntimeBase> ActorRuntime_; @@ -215,7 +264,6 @@ private: std::unique_ptr<NMiniKQL::THolderFactory> HolderFactory; std::unique_ptr<TDefaultValueBuilder> ValueBuilder; - IHTTPMockGateway::TPtr Gateway; }; UNIT_TEST_SUITE_REGISTRATION(TCloudFunctionTransformTest) |