aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhrustyashko <hrustyashko@yandex-team.ru>2022-03-28 17:10:15 +0300
committerhrustyashko <hrustyashko@yandex-team.ru>2022-03-28 17:10:15 +0300
commit1a84b3ff343ca0a6364fcc54ea40aee919354f43 (patch)
tree413cb5dbe29841f9860786bb9529b7bf4d054237
parent3e9fdacb8297268fdf38f64d3bf4065f48a2d4d7 (diff)
downloadydb-1a84b3ff343ca0a6364fcc54ea40aee919354f43.tar.gz
Получать IAM токен для трансформации
ref:6a690395ca2ca76ee82b4e4929559fa26278d834
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h2
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto2
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt2
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp24
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h13
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp7
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h4
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp72
8 files changed, 104 insertions, 22 deletions
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
index 2b57e288ff..7912149f05 100644
--- a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
@@ -23,6 +23,8 @@ public:
const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+
+ const THashMap<TString, TString>& SecureParams;
};
TDqTransformActorFactory();
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index 36d73316fe..8e6c760ec3 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -137,7 +137,7 @@ enum ETransformType {
message TDqTransform {
ETransformType Type = 1;
string FunctionName = 2;
- string ConnectionName = 3;
+ optional string ConnectionName = 3;
// binary YSON(codec ydb/library/yql/providers/common/schema/mkql)
string InputType = 4;
diff --git a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt
index 61a6a9637f..fc94136494 100644
--- a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt
+++ b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt
@@ -26,6 +26,8 @@ target_link_libraries(providers-cloud_function-actors PUBLIC
yql-minikql-computation
yql-public-issue
yql-sql-pg_dummy
+ common-token_accessor-client
+ client-ydb_types-credentials
)
target_sources(providers-cloud_function-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
index dc4534c9db..2c008cf057 100644
--- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
@@ -55,6 +55,7 @@ const auto RETRY_POLICY = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryB
TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner,
NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway,
+ NYdb::TCredentialsProviderPtr credentialsProvider,
IDqOutputChannel::TPtr transformInput,
IDqOutputConsumer::TPtr taskOutput,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
@@ -64,6 +65,7 @@ TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner,
, Owner(owner)
, Transform(transform)
, Gateway(std::move(gateway))
+, CredentialsProvider(std::move(credentialsProvider))
, TransformInput(std::move(transformInput))
, TaskOutput(std::move(taskOutput))
, TransformName(UrlEscapeRet(transform.GetFunctionName(), true))
@@ -147,7 +149,10 @@ void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvExecuteTransform
TransformInProgress = true;
auto* actorSystem = TActivationContext::ActorSystem();
- auto headers = IHTTPGateway::THeaders();
+ const auto iamToken = CredentialsProvider->GetAuthInfo();
+ auto headers = iamToken.empty()
+ ? IHTTPGateway::THeaders()
+ : IHTTPGateway::THeaders{TString("Authorization: ") + iamToken};
Gateway->Download(CLOUD_FUNCTION_BASE_URL + TransformName, headers, 10,
std::bind(&TCloudFunctionTransformActor::OnInvokeFinished, actorSystem, SelfId(),
context,
@@ -305,9 +310,22 @@ TGuard<NKikimr::NMiniKQL::TScopedAlloc> TCloudFunctionTransformActor::BindAlloca
return TypeEnv.BindAllocator();
}
-std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args) {
+std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(
+ const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ TDqTransformActorFactory::TArguments&& args) {
+
+ std::shared_ptr<NYdb::ICredentialsProviderFactory> credProviderFactory;
+ if (transform.HasConnectionName()) {
+ const auto token = args.SecureParams.Value(transform.GetConnectionName(), TString{});
+ credProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, true);
+ } else {
+ credProviderFactory = NYdb::CreateInsecureCredentialsProviderFactory();
+ }
+
const auto actor = new TCloudFunctionTransformActor(
- args.ComputeActorId, transform, gateway,
+ args.ComputeActorId, transform,
+ gateway, credProviderFactory->CreateProvider(),
args.TransformInput, args.TransformOutput,
args.HolderFactory, args.TypeEnv,
args.ProgramBuilder);
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h
index 85c9b9f073..a8402c7264 100644
--- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h
@@ -9,8 +9,10 @@
#include <ydb/library/yql/dq/runtime/dq_input_channel.h>
#include <ydb/library/yql/dq/runtime/dq_output_channel.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -54,7 +56,9 @@ public:
static constexpr char ActorName[] = "YQL_DQ_CLOUD_FUNC_TRANSFORM";
TCloudFunctionTransformActor(TActorId owner,
- NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway,
+ NDqProto::TDqTransform transform,
+ IHTTPGateway::TPtr gateway,
+ NYdb::TCredentialsProviderPtr credentialsProvider,
IDqOutputChannel::TPtr transformInput,
IDqOutputConsumer::TPtr taskOutput,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
@@ -102,6 +106,8 @@ private:
NDqProto::TDqTransform Transform;
IHTTPGateway::TPtr Gateway;
+ NYdb::TCredentialsProviderPtr CredentialsProvider;
+
IDqOutputChannel::TPtr TransformInput;
IDqOutputConsumer::TPtr TaskOutput;
@@ -117,6 +123,9 @@ private:
NMiniKQL::TType* OutputRowsType = nullptr;
};
-std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args);
+std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(
+ const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
+ TDqTransformActorFactory::TArguments&& args);
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
index ab4d33e308..9e16301cfc 100644
--- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
@@ -5,10 +5,11 @@
namespace NYql::NDq {
-void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway) {
+void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory) {
constexpr NDqProto::ETransformType type = NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION;
- factory.Register(type, [gateway](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) {
- return CreateCloudFunctionTransformActor(transform, gateway, std::move(args));
+ factory.Register(type, [gateway, credentialsFactory](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) {
+ return CreateCloudFunctionTransformActor(transform, gateway, credentialsFactory, std::move(args));
});
}
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h
index d17f556c73..681fd1cde1 100644
--- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h
@@ -2,9 +2,11 @@
#include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
namespace NYql::NDq {
-void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway);
+void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway,
+ ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory);
} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
index df3dfe2a9b..863bdba592 100644
--- a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
@@ -10,6 +10,7 @@
#include <ydb/library/yql/minikql/computation/mkql_value_builder.h>
#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h>
+#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <thread>
#include <chrono>
@@ -27,9 +28,9 @@ using namespace std::chrono_literals;
struct TTransformSystem {
IDqOutputChannel::TPtr InputChannel;
IDqOutputChannel::TPtr OutputChannel;
- TCloudFunctionTransformActor* TransformActor;
NActors::TActorId TransformActorId;
NActors::TActorId ComputeActorId;
+ IHTTPMockGateway::TPtr Gateway;
};
class TCloudFunctionTransformTest: public TTestBase {
@@ -38,6 +39,7 @@ public:
UNIT_TEST(TestEmptyChannel)
UNIT_TEST(TestMultiplicationTransform)
UNIT_TEST(TestSeveralIterations)
+ UNIT_TEST(TestPrivateFunction)
UNIT_TEST_SUITE_END();
TCloudFunctionTransformTest()
@@ -46,19 +48,24 @@ public:
, FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()))
, ProgramBuilder(*TypeEnv, *FunctionRegistry)
, MemInfo("Mem")
- , Gateway(IHTTPMockGateway::Make())
{
HolderFactory = std::make_unique<NMiniKQL::THolderFactory>(Alloc->Ref(), MemInfo);
ValueBuilder = std::make_unique<TDefaultValueBuilder>(*HolderFactory);
}
- TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName) {
+ TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName,
+ TString connectionName = {}, THashMap<TString, TString> secureParams = {}) {
auto system = TTransformSystem();
+ system.Gateway = IHTTPMockGateway::Make();
+
NDqProto::TDqTransform taskTransform;
taskTransform.SetType(NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION);
taskTransform.SetFunctionName(transformName);
+ if (!connectionName.empty()) {
+ taskTransform.SetConnectionName(connectionName);
+ }
TStructMember inMembers[] = {
{"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)},
@@ -89,12 +96,22 @@ public:
system.OutputChannel = CreateDqOutputChannel(nodeId + 400, outputType, *TypeEnv, *HolderFactory, settings, logger);
auto outputConsumer = CreateOutputMapConsumer(system.OutputChannel);
+ auto credentialsFactory = CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory("", true, "");
system.ComputeActorId = ActorRuntime_->AllocateEdgeActor();
- system.TransformActor = new TCloudFunctionTransformActor(system.ComputeActorId,
- taskTransform, Gateway,
- system.InputChannel, outputConsumer,
- *HolderFactory, *TypeEnv, ProgramBuilder);
- system.TransformActorId = ActorRuntime_->Register(system.TransformActor, nodeId);
+ auto args = TDqTransformActorFactory::TArguments{
+ .ComputeActorId = system.ComputeActorId,
+ .TransformInput = system.InputChannel,
+ .TransformOutput = outputConsumer,
+ .HolderFactory = *HolderFactory,
+ .TypeEnv = *TypeEnv,
+ .ProgramBuilder = ProgramBuilder,
+ .SecureParams = secureParams
+ };
+
+ NActors::IActor* transformActor;
+ std::tie(std::ignore, transformActor) = CreateCloudFunctionTransformActor(taskTransform, system.Gateway,
+ credentialsFactory, std::move(args));
+ system.TransformActorId = ActorRuntime_->Register(transformActor, nodeId);
return system;
}
@@ -133,7 +150,7 @@ public:
auto url = "https://functions.yandexcloud.net/multiplyC";
auto headers = IHTTPGateway::THeaders();
auto postBody = "[{\"A\":\"true\",\"C\":\"555\"}]";
- Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse);
+ context.Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse);
NUdf::TUnboxedValue* items;
auto value = ValueBuilder->NewArray(2, items);
@@ -169,9 +186,9 @@ public:
auto url = "https://functions.yandexcloud.net/multiplyCBatch";
auto headers = IHTTPGateway::THeaders();
auto postBody1 = "[{\"A\":\"true\",\"C\":\"430\"}]";
- Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1);
+ context.Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1);
auto postBody2 = "[{\"A\":\"false\",\"C\":\"800\"}]";
- Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2);
+ context.Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2);
NUdf::TUnboxedValue* items1;
auto value1 = ValueBuilder->NewArray(2, items1);
@@ -203,6 +220,38 @@ public:
transformedRows.clear();
}
+ void TestPrivateFunction() {
+ const auto token = "{\"token\":\"yVVeGG6W0ccToZw\"}";
+ const THashMap<TString, TString> secureParams = {{"private_cf_connection", token}};
+ auto context = StartUpTransformSystem(0, "multiplyPrivate", "private_cf_connection", secureParams);
+
+ IHTTPMockGateway::TDataResponse multiplyPrivateResponse = []() {
+ return IHTTPGateway::TContent("[{\"A\":true,\"C\":360}]", 200);
+ };
+
+ auto url = "https://functions.yandexcloud.net/multiplyPrivate";
+ auto headers = IHTTPGateway::THeaders{"Authorization: Bearer yVVeGG6W0ccToZw"};
+ auto postBody = "[{\"A\":\"true\",\"C\":\"180\"}]";
+ context.Gateway->AddDownloadResponse(url, headers, postBody, multiplyPrivateResponse);
+
+ NUdf::TUnboxedValue* items;
+ auto value = ValueBuilder->NewArray(2, items);
+ items[0] = NUdf::TUnboxedValuePod(true);
+ items[1] = NUdf::TUnboxedValuePod(ui32(180));
+
+ context.InputChannel->Push(std::move(value));
+ context.InputChannel->Finish();
+
+ auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform();
+ ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv));
+ auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1));
+
+ UNIT_ASSERT(event);
+ NKikimr::NMiniKQL::TUnboxedValueVector transformedRows;
+ context.OutputChannel->PopAll(transformedRows);
+ transformedRows.clear();
+ }
+
private:
THolder<NActors::TTestActorRuntimeBase> ActorRuntime_;
@@ -215,7 +264,6 @@ private:
std::unique_ptr<NMiniKQL::THolderFactory> HolderFactory;
std::unique_ptr<TDefaultValueBuilder> ValueBuilder;
- IHTTPMockGateway::TPtr Gateway;
};
UNIT_TEST_SUITE_REGISTRATION(TCloudFunctionTransformTest)