aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhrustyashko <hrustyashko@yandex-team.ru>2022-03-25 20:59:44 +0300
committerhrustyashko <hrustyashko@yandex-team.ru>2022-03-25 20:59:44 +0300
commit25ded0237971981b125ad435b3845969585a2ad7 (patch)
treec4009b93b45d4f380d19aa37629615fc0a8bab43
parent67e2ad960068f40e2dbcbce84908b06b91603a19 (diff)
downloadydb-25ded0237971981b125ad435b3845969585a2ad7.tar.gz
Actor для вызова трансформации через Cloud Function
ref:58a020f15a89ef366baf4da257052687121d2101
-rw-r--r--CMakeLists.darwin.txt4
-rw-r--r--CMakeLists.linux.txt4
-rw-r--r--ydb/core/protos/services.proto2
-rw-r--r--ydb/library/yql/dq/actors/dq_events_ids.h2
-rw-r--r--ydb/library/yql/dq/actors/transform/CMakeLists.txt24
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor.h11
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp30
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h39
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_events.cpp1
-rw-r--r--ydb/library/yql/dq/actors/transform/dq_transform_events.h23
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.cpp2
-rw-r--r--ydb/library/yql/dq/opt/dq_opt.h2
-rw-r--r--ydb/library/yql/dq/proto/dq_tasks.proto7
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp3
-rw-r--r--ydb/library/yql/dq/tasks/dq_connection_builder.h8
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt33
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp317
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h122
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp15
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h10
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp222
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt55
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt56
-rw-r--r--ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt13
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt21
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp92
-rw-r--r--ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h25
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp22
-rw-r--r--ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h5
-rw-r--r--ydb/library/yql/providers/dq/planner/execution_planner.cpp12
30 files changed, 1165 insertions, 17 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt
index e3decdfe93..26d76167eb 100644
--- a/CMakeLists.darwin.txt
+++ b/CMakeLists.darwin.txt
@@ -1192,12 +1192,14 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut)
add_subdirectory(ydb/core/yq/libs/result_formatter/ut)
add_subdirectory(ydb/core/yq/libs/signer/ut)
add_subdirectory(ydb/core/yq/libs/test_connection/ut)
+add_subdirectory(ydb/library/yql/dq/actors/transform)
add_subdirectory(ydb/library/yql/dq/actors/compute/ut)
add_subdirectory(ydb/library/yql/dq/runtime/ut)
add_subdirectory(ydb/library/yql/dq/state/ut)
add_subdirectory(ydb/library/yql/parser/pg_catalog/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
+add_subdirectory(ydb/library/yql/providers/cloud_function/actors)
add_subdirectory(ydb/library/yql/public/decimal/ut)
add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
@@ -1214,6 +1216,8 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl)
add_subdirectory(ydb/library/yql/providers/common/codec/ut)
add_subdirectory(ydb/library/yql/providers/common/structured_token/ut)
add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy)
+add_subdirectory(ydb/library/yql/providers/cloud_function/actors/ut)
+add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock)
add_subdirectory(ydb/library/yql/udfs/common/stat/ut)
add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut)
diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt
index 2b096d75ca..a80add5dfc 100644
--- a/CMakeLists.linux.txt
+++ b/CMakeLists.linux.txt
@@ -1287,12 +1287,14 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut)
add_subdirectory(ydb/core/yq/libs/result_formatter/ut)
add_subdirectory(ydb/core/yq/libs/signer/ut)
add_subdirectory(ydb/core/yq/libs/test_connection/ut)
+add_subdirectory(ydb/library/yql/dq/actors/transform)
add_subdirectory(ydb/library/yql/dq/actors/compute/ut)
add_subdirectory(ydb/library/yql/dq/runtime/ut)
add_subdirectory(ydb/library/yql/dq/state/ut)
add_subdirectory(ydb/library/yql/parser/pg_catalog/ut)
add_subdirectory(ydb/library/yql/providers/common/schema)
add_subdirectory(ydb/library/yql/providers/common/schema/skiff)
+add_subdirectory(ydb/library/yql/providers/cloud_function/actors)
add_subdirectory(ydb/library/yql/public/decimal/ut)
add_subdirectory(ydb/library/yql/public/issue/ut)
add_subdirectory(ydb/library/yql/public/udf/ut)
@@ -1309,6 +1311,8 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl)
add_subdirectory(ydb/library/yql/providers/common/codec/ut)
add_subdirectory(ydb/library/yql/providers/common/structured_token/ut)
add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy)
+add_subdirectory(ydb/library/yql/providers/cloud_function/actors/ut)
+add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock)
add_subdirectory(ydb/library/yql/udfs/common/stat/ut)
add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut)
add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut)
diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto
index 76fd4b5979..72b8384cc3 100644
--- a/ydb/core/protos/services.proto
+++ b/ydb/core/protos/services.proto
@@ -267,6 +267,8 @@ enum EServiceKikimr {
YQL_NODES_MANAGER = 715;
+ YANDEX_CLOUD_FUNCTION = 716;
+
// Exports (& imports)
EXPORT = 800;
DATASHARD_RESTORE = 801;
diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h
index 2049edee75..1022c93201 100644
--- a/ydb/library/yql/dq/actors/dq_events_ids.h
+++ b/ydb/library/yql/dq/actors/dq_events_ids.h
@@ -50,6 +50,8 @@ struct TDqComputeEvents {
EvNewCheckpointCoordinatorAck,
EvNewSourceDataArrived,
EvSourceError,
+ EvTransformNewData,
+ EvTransformCompleted,
// place all new events here
EvEnd
diff --git a/ydb/library/yql/dq/actors/transform/CMakeLists.txt b/ydb/library/yql/dq/actors/transform/CMakeLists.txt
new file mode 100644
index 0000000000..4dc0fe6ead
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/CMakeLists.txt
@@ -0,0 +1,24 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(dq-actors-transform)
+target_compile_options(dq-actors-transform PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(dq-actors-transform PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ yql-dq-runtime
+ yql-dq-proto
+)
+target_sources(dq-actors-transform PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
+)
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor.h
new file mode 100644
index 0000000000..0f8e1a6394
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor.h
@@ -0,0 +1,11 @@
+#pragma once
+
+namespace NYql::NDq {
+
+class IDqTransformActor {
+public:
+ virtual void DoTransform() = 0;
+ virtual ~IDqTransformActor() = default;
+};
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
new file mode 100644
index 0000000000..f7bed20f83
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp
@@ -0,0 +1,30 @@
+#include "dq_transform_actor_factory.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+#include <google/protobuf/text_format.h>
+
+namespace NYql::NDq {
+
+TDqTransformActorFactory::TDqTransformActorFactory()
+{}
+
+std::pair<IDqTransformActor*, NActors::IActor*> TDqTransformActorFactory::CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args) {
+ auto creator = CreatorsByType.find(transform.GetType());
+ if (creator == CreatorsByType.end()) {
+ const google::protobuf::EnumDescriptor* descriptor = NDqProto::ETransformType_descriptor();
+ std::string transformName = descriptor->FindValueByNumber(transform.GetType())->name();
+ YQL_ENSURE(false, "Unregistered type of transform actor: \"" << transformName << "\"");
+ }
+
+ std::pair<IDqTransformActor*, NActors::IActor*> actor = (creator->second)(transform, std::move(args));
+ Y_VERIFY(actor.first);
+ Y_VERIFY(actor.second);
+ return actor;
+}
+
+void TDqTransformActorFactory::Register(NDqProto::ETransformType type, TTransformCreator creator) {
+ auto [_, registered] = CreatorsByType.emplace(type, std::move(creator));
+ Y_VERIFY(registered);
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
new file mode 100644
index 0000000000..2b57e288ff
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h
@@ -0,0 +1,39 @@
+#pragma once
+
+#include "dq_transform_actor.h"
+
+#include <ydb/library/yql/dq/common/dq_common.h>
+#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
+#include <ydb/library/yql/dq/runtime/dq_output_channel.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <library/cpp/actors/core/actor.h>
+
+namespace NYql::NDq {
+
+class TDqTransformActorFactory : public TThrRefBase {
+public:
+ using TPtr = TIntrusivePtr<TDqTransformActorFactory>;
+
+ struct TArguments {
+ const NActors::TActorId ComputeActorId;
+ const IDqOutputChannel::TPtr TransformInput;
+ const IDqOutputConsumer::TPtr TransformOutput;
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
+ NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+ };
+
+ TDqTransformActorFactory();
+
+ using TTransformCreator = std::function<std::pair<IDqTransformActor*, NActors::IActor*>(
+ const NDqProto::TDqTransform& transform, TArguments&& args)>;
+
+ std::pair<IDqTransformActor*, NActors::IActor*> CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args);
+ void Register(NDqProto::ETransformType type, TTransformCreator creator);
+
+private:
+ std::unordered_map<NDqProto::ETransformType, TTransformCreator> CreatorsByType;
+};
+} \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
new file mode 100644
index 0000000000..b48f009e34
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp
@@ -0,0 +1 @@
+#include "dq_transform_events.h" \ No newline at end of file
diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.h b/ydb/library/yql/dq/actors/transform/dq_transform_events.h
new file mode 100644
index 0000000000..a41c79480e
--- /dev/null
+++ b/ydb/library/yql/dq/actors/transform/dq_transform_events.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/dq/actors/dq_events_ids.h>
+
+#include <library/cpp/actors/core/events.h>
+#include <library/cpp/actors/core/event_local.h>
+
+namespace NYql::NDq {
+using namespace NActors;
+
+namespace NTransformActor {
+ struct TEvTransformNewData : public TEventLocal<TEvTransformNewData, TDqComputeEvents::EvTransformNewData> {
+ TEvTransformNewData() {}
+ };
+
+ struct TEvTransformCompleted : public TEventLocal<TEvTransformCompleted, TDqComputeEvents::EvTransformCompleted> {
+ TEvTransformCompleted() {}
+ };
+
+} // namespace NTransformActor
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp
index f60f29dd46..45175199fb 100644
--- a/ydb/library/yql/dq/opt/dq_opt.cpp
+++ b/ydb/library/yql/dq/opt/dq_opt.cpp
@@ -29,7 +29,7 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) {
} else if (name == TransformTypeSetting) {
YQL_ENSURE(tuple.Value().Maybe<TCoAtom>());
if (const auto type = tuple.Value().Cast<TCoAtom>().Value(); type == "YANDEX-CLOUD") {
- settings.TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD;
+ settings.TransformType = NDqProto::TRANSFORM_CLOUD_FUNCTION;
} else {
YQL_ENSURE(false, "Unknown transform type: " << type);
}
diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h
index 5b3146d0f4..cbb0714aa4 100644
--- a/ydb/library/yql/dq/opt/dq_opt.h
+++ b/ydb/library/yql/dq/opt/dq_opt.h
@@ -22,7 +22,7 @@ struct TDqStageSettings {
bool SinglePartition = false;
bool IsExternalFunction = false;
- NDqProto::ETransformType TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD;
+ NDqProto::ETransformType TransformType = NDqProto::TRANSFORM_CLOUD_FUNCTION;
TString TransformName;
ui32 TransformConcurrency = 0;
diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto
index e29e64ef2c..36d73316fe 100644
--- a/ydb/library/yql/dq/proto/dq_tasks.proto
+++ b/ydb/library/yql/dq/proto/dq_tasks.proto
@@ -130,13 +130,18 @@ message TTaskOutput {
}
enum ETransformType {
- TRANSFORM_YANDEX_CLOUD = 0;
+ TRANSFORM_UNSPECIFIED = 0;
+ TRANSFORM_CLOUD_FUNCTION = 1;
}
message TDqTransform {
ETransformType Type = 1;
string FunctionName = 2;
string ConnectionName = 3;
+
+ // binary YSON(codec ydb/library/yql/providers/common/schema/mkql)
+ string InputType = 4;
+ string OutputType = 5;
}
message TDqTask {
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index 211d4ac39a..e543e1722b 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -280,6 +280,7 @@ public:
ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer.
ui64 rows = Data.size();
Data.clear();
+ TDataType().swap(Data);
return rows;
}
@@ -757,6 +758,8 @@ public:
DataTail.clear();
SizeTail.clear();
MemoryUsed = 0;
+ TDataType().swap(DataHead);
+ TDataType().swap(DataTail);
// todo: send remove request
SpilledRows = 0;
SpilledBlobs.clear();
diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h
index 2c46061f60..4f4cd8189b 100644
--- a/ydb/library/yql/dq/tasks/dq_connection_builder.h
+++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h
@@ -47,9 +47,11 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp
for (ui32 i = 0; i < partitionsCount; ++i) {
auto& task = graph.AddTask(stageInfo);
- auto& transform = task.OutputTransform;
- transform.Type = stageSettings.TransformType;
- transform.FunctionName = stageSettings.TransformName;
+ if (stageSettings.IsExternalFunction) {
+ auto& transform = task.OutputTransform;
+ transform.Type = stageSettings.TransformType;
+ transform.FunctionName = stageSettings.TransformName;
+ }
}
}
diff --git a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt
new file mode 100644
index 0000000000..61a6a9637f
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt
@@ -0,0 +1,33 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-cloud_function-actors)
+target_compile_options(providers-cloud_function-actors PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-cloud_function-actors PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ cpp-actors-core
+ library-cpp-retry
+ yql-dq-runtime
+ yql-dq-proto
+ dq-actors-transform
+ providers-common-http_gateway
+ providers-common-codec
+ common-schema-mkql
+ library-yql-minikql
+ yql-minikql-computation
+ yql-public-issue
+ yql-sql-pg_dummy
+)
+target_sources(providers-cloud_function-actors PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
+)
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
new file mode 100644
index 0000000000..dc4534c9db
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp
@@ -0,0 +1,317 @@
+#include "cloud_function_transform.h"
+
+#include <ydb/core/protos/services.pb.h>
+
+#include <ydb/library/yql/providers/common/codec/yql_json_codec.h>
+#include <ydb/library/yql/minikql/mkql_type_ops.h>
+#include <ydb/library/yql/minikql/mkql_node.h>
+#include <ydb/library/yql/public/issue/yql_issue.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+
+#include <library/cpp/string_utils/base64/base64.h>
+#include <library/cpp/string_utils/quote/quote.h>
+#include <library/cpp/retry/retry_policy.h>
+#include <library/cpp/actors/core/log.h>
+
+#define LOG_D(s) \
+ LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s)
+#define LOG_W(s) \
+ LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s)
+#define LOG_E(s) \
+ LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s)
+
+
+namespace NYql::NDq {
+
+using namespace NActors;
+using namespace NCommon;
+using namespace NKikimr;
+
+namespace {
+
+constexpr TStringBuf CLOUD_FUNCTION_BASE_URL = "https://functions.yandexcloud.net/";
+
+constexpr size_t MAX_RETRY = 3;
+
+ERetryErrorClass RetryByHttpCode(long httpResponseCode) {
+ switch (httpResponseCode) {
+ case 429:
+ return ERetryErrorClass::LongRetry;
+ case 500:
+ case 503:
+ case 504:
+ return ERetryErrorClass::ShortRetry;
+ }
+ return ERetryErrorClass::NoRetry;
+}
+
+const auto RETRY_POLICY = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryByHttpCode,
+ TDuration::MilliSeconds(100),
+ TDuration::MilliSeconds(400),
+ TDuration::Seconds(5),
+ MAX_RETRY);
+
+}
+
+TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner,
+ NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway,
+ IDqOutputChannel::TPtr transformInput,
+ IDqOutputConsumer::TPtr taskOutput,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ NKikimr::NMiniKQL::TProgramBuilder& programBuilder)
+: TActor(&TCloudFunctionTransformActor::StateFunc)
+, Owner(owner)
+, Transform(transform)
+, Gateway(std::move(gateway))
+, TransformInput(std::move(transformInput))
+, TaskOutput(std::move(taskOutput))
+, TransformName(UrlEscapeRet(transform.GetFunctionName(), true))
+, HolderFactory(holderFactory)
+, TypeEnv(typeEnv)
+, ProgramBuilder(programBuilder)
+{
+ TStringStream err;
+ InputRowType = ParseTypeFromYson(TStringBuf{transform.GetInputType()}, ProgramBuilder, err);
+ YQL_ENSURE(InputRowType, "Can't parse cloud function input type");
+
+ NMiniKQL::TType* outputType = ParseTypeFromYson(TStringBuf{transform.GetOutputType()}, ProgramBuilder, err);
+ YQL_ENSURE(outputType, "Can't parse cloud function output type");
+ OutputRowsType = NKikimr::NMiniKQL::TListType::Create(outputType, TypeEnv);
+}
+
+
+STRICT_STFUNC(TCloudFunctionTransformActor::StateFunc,
+ hFunc(TEvents::TEvPoison, HandlePoison);
+ hFunc(TCFTransformEvent::TEvTransformSuccess, Handle);
+ hFunc(TCFTransformEvent::TEvExecuteTransform, Handle);
+ hFunc(TEvDq::TEvAbortExecution, Handle);
+)
+
+STATEFN(TCloudFunctionTransformActor::DeadState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvents::TEvPoison, HandlePoison);
+ }
+}
+
+void TCloudFunctionTransformActor::Handle(TEvDq::TEvAbortExecution::TPtr& ev) {
+ auto& msg = ev->Get()->Record;
+ TIssues issues = ev->Get()->GetIssues();
+ if (msg.GetStatusCode() == Ydb::StatusIds::INTERNAL_ERROR) {
+ InternalError(issues);
+ } else if (msg.GetStatusCode() == Ydb::StatusIds::ABORTED) {
+ Aborted(issues);
+ } else {
+ RuntimeError(issues);
+ }
+}
+
+void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvExecuteTransform::TPtr& ev) {
+ Y_UNUSED(ev);
+
+ auto guard = BindAllocator();
+
+ TString payload;
+ TStringOutput jsonStream{payload};
+ NJson::TJsonWriter jsonWriter{&jsonStream, false};
+
+ jsonWriter.OpenArray();
+
+ bool hasPayload = false;
+ while (TransformInput->HasData() && !TaskOutput->IsFull()) {
+ NKikimr::NMiniKQL::TUnboxedValueVector rowsToTransform;
+ if (TransformInput->PopAll(rowsToTransform)) {
+ hasPayload = true;
+ for (auto&& row : rowsToTransform) {
+ try {
+ NJsonCodec::WriteValueToJson(jsonWriter, row, InputRowType, NJsonCodec::DefaultPolicy::getInstance().CloudFunction());
+ } catch (const std::exception& ex) {
+ auto message = TStringBuilder()
+ << "Failed to convert data to input json for cloud function '"
+ << TransformName << "': " << ex.what();
+ RuntimeError(message);
+ return;
+ }
+ }
+ }
+ }
+ jsonWriter.CloseArray();
+ jsonWriter.Flush();
+
+ TCFReqContext context;
+ if (!TransformInput->HasData() && TransformInput->IsFinished()) {
+ context.LastBatch = true;
+ }
+
+ if (hasPayload) {
+ TransformInProgress = true;
+
+ auto* actorSystem = TActivationContext::ActorSystem();
+ auto headers = IHTTPGateway::THeaders();
+ Gateway->Download(CLOUD_FUNCTION_BASE_URL + TransformName, headers, 10,
+ std::bind(&TCloudFunctionTransformActor::OnInvokeFinished, actorSystem, SelfId(),
+ context,
+ std::placeholders::_1),
+ payload, RETRY_POLICY);
+ } else if (context.LastBatch) {
+ CompleteTransform();
+ }
+}
+
+void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvTransformSuccess::TPtr& ev) {
+ auto guard = BindAllocator();
+
+ NKikimr::NUdf::TUnboxedValue transformedData;
+ try {
+ auto body = std::string_view(ev->Get()->Result);
+ NJson::TJsonValue bodyJson;
+ if (!NJson::ReadJsonTree(body, &bodyJson, false)) {
+ YQL_ENSURE(false, "Invalid json");
+ }
+ transformedData = NJsonCodec::ReadJsonValue(bodyJson, OutputRowsType, HolderFactory);
+ } catch (const std::exception& ex) {
+ auto message = TStringBuilder()
+ << "Failed to convert output json from cloud function '"
+ << TransformName << "': " << ex.what();
+ RuntimeError(message);
+ return;
+ }
+
+ TransformInProgress = false;
+
+ //if (!TaskOutput->IsFull()) {
+ TaskOutput->Consume(std::move(transformedData));
+ auto newDataEv = new NTransformActor::TEvTransformNewData();
+ Send(Owner, newDataEv);
+ //}
+
+ if (ev->Get()->LastBatch) {
+ CompleteTransform();
+ }
+}
+
+void TCloudFunctionTransformActor::DoTransform() {
+ if (TransformInProgress)
+ return;
+
+ auto executeEv = new TCFTransformEvent::TEvExecuteTransform();
+ Send(SelfId(), executeEv);
+}
+
+void TCloudFunctionTransformActor::OnInvokeFinished(TActorSystem* actorSystem, TActorId selfId,
+ TCFReqContext reqContext,
+ IHTTPGateway::TResult&& result) {
+ switch (result.index()) {
+ case 0U: {
+ auto response = std::get<IHTTPGateway::TContent>(std::move(result));
+ switch (response.HttpResponseCode) {
+ case 200: {
+ auto event = new TCFTransformEvent::TEvTransformSuccess(std::move(response), reqContext.LastBatch);
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), event));
+ break;
+ }
+ case 403:
+ case 404: {
+ auto transformName = reqContext.TransformName;
+ auto message = TStringBuilder() << "Got '" << response.HttpResponseCode << "' response code from cloud function '"
+ << transformName << "'";
+ auto ev = TEvDq::TEvAbortExecution::Unavailable(message);
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release()));
+ break;
+ }
+ case 429:
+ case 500:
+ case 503:
+ case 504: {
+ auto transformName = reqContext.TransformName;
+ auto message = TStringBuilder() << "Got '" << response.HttpResponseCode << "' response code from cloud function '"
+ << transformName << "' and retry has been out";
+ auto ev = TEvDq::TEvAbortExecution::Aborted(message);
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release()));
+ break;
+ }
+ default:
+ auto transformName = reqContext.TransformName;
+ auto message = TStringBuilder() << "Got unexpected response code (" << response.HttpResponseCode
+ << ") from cloud function '" << transformName;
+ auto ev = TEvDq::TEvAbortExecution::Aborted(message);
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release()));
+ }
+ break;
+ }
+ case 1U:
+ auto transformName = reqContext.TransformName;
+ auto message = TStringBuilder() << "Internal error during call cloud function '" << transformName;
+ auto ev = TEvDq::TEvAbortExecution::InternalError(message, std::get<TIssues>(result));
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release()));
+ break;
+ }
+}
+
+
+void TCloudFunctionTransformActor::HandlePoison(NActors::TEvents::TEvPoison::TPtr&) {
+ PassAway();
+}
+
+void TCloudFunctionTransformActor::RuntimeError(const TString& message, const TIssues& subIssues) {
+ LOG_E(message);
+ RuntimeError(TEvDq::TEvAbortExecution::Unavailable(message, subIssues)->GetIssues());
+}
+
+void TCloudFunctionTransformActor::RuntimeError(const TIssues& issues) {
+ TransformInProgress = false;
+ auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, issues);
+ Send(Owner, ev.Release());
+
+ Become(&TCloudFunctionTransformActor::DeadState);
+}
+
+void TCloudFunctionTransformActor::InternalError(const TString& message, const TIssues& subIssues) {
+ LOG_E(message);
+ InternalError(TEvDq::TEvAbortExecution::InternalError(message, subIssues)->GetIssues());
+}
+
+void TCloudFunctionTransformActor::InternalError(const TIssues& issues) {
+ TransformInProgress = false;
+ auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, issues);
+ Send(Owner, ev.Release());
+
+ Become(&TCloudFunctionTransformActor::DeadState);
+}
+
+void TCloudFunctionTransformActor::Aborted(const TString& message, const TIssues& subIssues) {
+ LOG_E(message);
+ Aborted(TEvDq::TEvAbortExecution::Aborted(message, subIssues)->GetIssues());
+}
+
+void TCloudFunctionTransformActor::Aborted(const TIssues& issues) {
+ TransformInProgress = false;
+ auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::ABORTED, issues);
+ Send(Owner, ev.Release());
+
+ Become(&TCloudFunctionTransformActor::DeadState);
+}
+
+
+void TCloudFunctionTransformActor::CompleteTransform() {
+ TransformInProgress = false;
+ TaskOutput->Finish();
+
+ auto completeEv = new NTransformActor::TEvTransformCompleted();
+ Send(Owner, completeEv);
+}
+
+TGuard<NKikimr::NMiniKQL::TScopedAlloc> TCloudFunctionTransformActor::BindAllocator() {
+ return TypeEnv.BindAllocator();
+}
+
+std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args) {
+ const auto actor = new TCloudFunctionTransformActor(
+ args.ComputeActorId, transform, gateway,
+ args.TransformInput, args.TransformOutput,
+ args.HolderFactory, args.TypeEnv,
+ args.ProgramBuilder);
+ return {actor, actor};
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h
new file mode 100644
index 0000000000..85c9b9f073
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h
@@ -0,0 +1,122 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/dq.h>
+#include <ydb/library/yql/dq/actors/transform/dq_transform_actor.h>
+#include <ydb/library/yql/dq/actors/transform/dq_transform_events.h>
+#include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h>
+#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
+#include <ydb/library/yql/dq/runtime/dq_input_channel.h>
+#include <ydb/library/yql/dq/runtime/dq_output_channel.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+
+#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/hfunc.h>
+#include <library/cpp/actors/core/actor.h>
+
+namespace NYql::NDq {
+
+using namespace NActors;
+using namespace NKikimr;
+
+class TCloudFunctionTransformActor : public NActors::TActor<TCloudFunctionTransformActor>
+ , public IDqTransformActor
+ , public TThrRefBase
+{
+public:
+ struct TCFTransformEvent {
+ enum {
+ EV_BEGIN = EventSpaceBegin(TEvents::ES_PRIVATE),
+ EV_TRANSFORM_SUCCESS = EV_BEGIN,
+ EV_EXECUTE_TRANSFORM
+ };
+
+ struct TEvTransformSuccess : public TEventLocal<TEvTransformSuccess, EV_TRANSFORM_SUCCESS> {
+ TEvTransformSuccess(IHTTPGateway::TContent&& result, bool lastBatch)
+ : Result(std::move(result))
+ , LastBatch(lastBatch)
+ {}
+
+ IHTTPGateway::TContent Result;
+ bool LastBatch = false;
+ };
+
+ struct TEvExecuteTransform : public TEventLocal<TEvExecuteTransform, EV_EXECUTE_TRANSFORM> {
+ TEvExecuteTransform() {}
+ };
+ };
+
+public:
+ using TPtr = TIntrusivePtr<TCloudFunctionTransformActor>;
+
+ static constexpr char ActorName[] = "YQL_DQ_CLOUD_FUNC_TRANSFORM";
+
+ TCloudFunctionTransformActor(TActorId owner,
+ NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway,
+ IDqOutputChannel::TPtr transformInput,
+ IDqOutputConsumer::TPtr taskOutput,
+ const NKikimr::NMiniKQL::THolderFactory& holderFactory,
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ NKikimr::NMiniKQL::TProgramBuilder& programBuilder);
+ void DoTransform();
+
+private:
+ STATEFN(StateFunc);
+ void Handle(TCFTransformEvent::TEvTransformSuccess::TPtr& ev);
+ void Handle(TCFTransformEvent::TEvExecuteTransform::TPtr& ev);
+ void Handle(TEvDq::TEvAbortExecution::TPtr& ev);
+
+ STATEFN(DeadState);
+ void HandlePoison(NActors::TEvents::TEvPoison::TPtr&);
+
+private:
+ struct TCFReqContext {
+ TCFReqContext() {}
+ TCFReqContext(bool lastBatch): LastBatch(lastBatch) {}
+
+ TString TransformName;
+ bool LastBatch = false;
+ };
+
+ static void OnInvokeFinished(TActorSystem* actorSystem, TActorId selfId, TCFReqContext reqContext, IHTTPGateway::TResult&& result);
+
+ void InternalError(const TIssues& issues);
+ void InternalError(const TString& message, const TIssues& subIssues = {});
+
+ void RuntimeError(const TIssues& issues);
+ void RuntimeError(const TString& message, const TIssues& subIssues = {});
+
+ void Aborted(const TIssues& issues);
+ void Aborted(const TString& message, const TIssues& subIssues = {});
+
+ void CompleteTransform();
+
+public:
+ TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator();
+
+private:
+ NActors::TActorId Owner; // compute actor Id
+
+ NDqProto::TDqTransform Transform;
+ IHTTPGateway::TPtr Gateway;
+
+ IDqOutputChannel::TPtr TransformInput;
+ IDqOutputConsumer::TPtr TaskOutput;
+
+ TString TransformName;
+
+ bool TransformInProgress = false;
+
+ const NKikimr::NMiniKQL::THolderFactory& HolderFactory;
+ const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv;
+ NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder;
+
+ NMiniKQL::TType* InputRowType = nullptr;
+ NMiniKQL::TType* OutputRowsType = nullptr;
+};
+
+std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args);
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
new file mode 100644
index 0000000000..ab4d33e308
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp
@@ -0,0 +1,15 @@
+#include "cloud_function_transform_factory.h"
+#include "cloud_function_transform.h"
+
+#include <ydb/library/yql/dq/proto/dq_tasks.pb.h>
+
+namespace NYql::NDq {
+
+void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway) {
+ constexpr NDqProto::ETransformType type = NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION;
+ factory.Register(type, [gateway](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) {
+ return CreateCloudFunctionTransformActor(transform, gateway, std::move(args));
+ });
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h
new file mode 100644
index 0000000000..d17f556c73
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h>
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+
+namespace NYql::NDq {
+
+void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway);
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
new file mode 100644
index 0000000000..df3dfe2a9b
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
@@ -0,0 +1,222 @@
+#include "cloud_function_transform.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+#include <library/cpp/actors/testlib/test_runtime.h>
+
+#include <ydb/library/yql/dq/runtime/dq_output_consumer.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/computation/mkql_value_builder.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
+#include <ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h>
+
+#include <thread>
+#include <chrono>
+
+namespace {
+
+using namespace NYql;
+using namespace NActors;
+using namespace NKikimr;
+using namespace NMiniKQL;
+using namespace NDq;
+
+using namespace std::chrono_literals;
+
+struct TTransformSystem {
+ IDqOutputChannel::TPtr InputChannel;
+ IDqOutputChannel::TPtr OutputChannel;
+ TCloudFunctionTransformActor* TransformActor;
+ NActors::TActorId TransformActorId;
+ NActors::TActorId ComputeActorId;
+};
+
+class TCloudFunctionTransformTest: public TTestBase {
+public:
+ UNIT_TEST_SUITE(TCloudFunctionTransformTest);
+ UNIT_TEST(TestEmptyChannel)
+ UNIT_TEST(TestMultiplicationTransform)
+ UNIT_TEST(TestSeveralIterations)
+ UNIT_TEST_SUITE_END();
+
+ TCloudFunctionTransformTest()
+ : Alloc(std::make_unique<TScopedAlloc>(NKikimr::TAlignedPagePoolCounters(), false))
+ , TypeEnv(std::make_unique<TTypeEnvironment>(*Alloc))
+ , FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry()))
+ , ProgramBuilder(*TypeEnv, *FunctionRegistry)
+ , MemInfo("Mem")
+ , Gateway(IHTTPMockGateway::Make())
+ {
+ HolderFactory = std::make_unique<NMiniKQL::THolderFactory>(Alloc->Ref(), MemInfo);
+ ValueBuilder = std::make_unique<TDefaultValueBuilder>(*HolderFactory);
+ }
+
+ TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName) {
+ auto system = TTransformSystem();
+
+ NDqProto::TDqTransform taskTransform;
+
+ taskTransform.SetType(NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION);
+ taskTransform.SetFunctionName(transformName);
+
+ TStructMember inMembers[] = {
+ {"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)},
+ {"C", TDataType::Create(NUdf::TDataType<ui32>::Id, *TypeEnv)}
+ };
+ NMiniKQL::TType* inputType = TStructType::Create(2, inMembers, *TypeEnv);
+ taskTransform.SetInputType(NYql::NCommon::WriteTypeToYson(inputType));
+
+ TStructMember outMembers[] = {
+ {"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)},
+ {"C", TDataType::Create(NUdf::TDataType<ui64>::Id, *TypeEnv)}
+ };
+ NMiniKQL::TType* outputType = TStructType::Create(2, outMembers, *TypeEnv);
+ taskTransform.SetOutputType(NYql::NCommon::WriteTypeToYson(outputType));
+
+ TDqOutputChannelSettings settings;
+ settings.MaxStoredBytes = 10000;
+ settings.MaxChunkBytes = 2_MB;
+ settings.TransportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0;
+ settings.CollectProfileStats = false;
+ settings.AllowGeneratorsInUnboxedValues = false;
+
+ TLogFunc logger = [](const TString& message) {
+ Y_UNUSED(message);
+ };
+
+ system.InputChannel = CreateDqOutputChannel(nodeId + 100, inputType, *TypeEnv, *HolderFactory, settings, logger);
+ system.OutputChannel = CreateDqOutputChannel(nodeId + 400, outputType, *TypeEnv, *HolderFactory, settings, logger);
+ auto outputConsumer = CreateOutputMapConsumer(system.OutputChannel);
+
+ system.ComputeActorId = ActorRuntime_->AllocateEdgeActor();
+ system.TransformActor = new TCloudFunctionTransformActor(system.ComputeActorId,
+ taskTransform, Gateway,
+ system.InputChannel, outputConsumer,
+ *HolderFactory, *TypeEnv, ProgramBuilder);
+ system.TransformActorId = ActorRuntime_->Register(system.TransformActor, nodeId);
+
+ return system;
+ }
+
+ void SetUp() override {
+ const ui32 nodesNumber = 6;
+ ActorRuntime_.Reset(new NActors::TTestActorRuntimeBase(nodesNumber, false));
+ ActorRuntime_->Initialize();
+ }
+
+ void TearDown() override {
+ ActorRuntime_.Reset();
+ }
+
+ void TestEmptyChannel() {
+ auto context = StartUpTransformSystem(0, "emptyChannel");
+
+ context.InputChannel->Finish();
+ auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform();
+ ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv));
+
+ auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1));
+
+ UNIT_ASSERT(event);
+ UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished());
+ }
+
+
+ void TestMultiplicationTransform() {
+ auto context = StartUpTransformSystem(0, "multiplyC");
+
+ IHTTPMockGateway::TDataResponse multiplyCResponse = []() {
+ return IHTTPGateway::TContent("[{\"A\":true,\"C\":1110}]", 200);
+ };
+
+ auto url = "https://functions.yandexcloud.net/multiplyC";
+ auto headers = IHTTPGateway::THeaders();
+ auto postBody = "[{\"A\":\"true\",\"C\":\"555\"}]";
+ Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse);
+
+ NUdf::TUnboxedValue* items;
+ auto value = ValueBuilder->NewArray(2, items);
+ items[0] = NUdf::TUnboxedValuePod(true);
+ items[1] = NUdf::TUnboxedValuePod(ui32(555));
+
+ context.InputChannel->Push(std::move(value));
+ context.InputChannel->Finish();
+
+ auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform();
+ ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv));
+ auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1));
+
+ UNIT_ASSERT(event);
+ NKikimr::NMiniKQL::TUnboxedValueVector transformedRows;
+ context.OutputChannel->PopAll(transformedRows);
+ UNIT_ASSERT_VALUES_EQUAL(transformedRows.size(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished());
+ transformedRows.clear();
+ }
+
+ void TestSeveralIterations() {
+ auto context = StartUpTransformSystem(0, "multiplyCBatch");
+
+ IHTTPMockGateway::TDataResponse multiplyCResponse1 = []() {
+ return IHTTPGateway::TContent("[{\"A\":true,\"C\":860}]", 200);
+ };
+
+ IHTTPMockGateway::TDataResponse multiplyCResponse2 = []() {
+ return IHTTPGateway::TContent("[{\"A\":false,\"C\":800}]", 200);
+ };
+
+ auto url = "https://functions.yandexcloud.net/multiplyCBatch";
+ auto headers = IHTTPGateway::THeaders();
+ auto postBody1 = "[{\"A\":\"true\",\"C\":\"430\"}]";
+ Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1);
+ auto postBody2 = "[{\"A\":\"false\",\"C\":\"800\"}]";
+ Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2);
+
+ NUdf::TUnboxedValue* items1;
+ auto value1 = ValueBuilder->NewArray(2, items1);
+ items1[0] = NUdf::TUnboxedValuePod(true);
+ items1[1] = NUdf::TUnboxedValuePod(ui32(430));
+ context.InputChannel->Push(std::move(value1));
+ auto executeEv1 = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform();
+ ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv1));
+
+ ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformNewData>(context.ComputeActorId, TDuration::MilliSeconds(500));
+
+ NUdf::TUnboxedValue* items2;
+ auto value2 = ValueBuilder->NewArray(2, items2);
+ items2[0] = NUdf::TUnboxedValuePod(false);
+ items2[1] = NUdf::TUnboxedValuePod(ui32(800));
+ context.InputChannel->Push(std::move(value2));
+ context.InputChannel->Finish();
+
+ auto executeEv2 = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform();
+ ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv2));
+
+ auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1));
+
+ UNIT_ASSERT(event);
+ NKikimr::NMiniKQL::TUnboxedValueVector transformedRows;
+ context.OutputChannel->PopAll(transformedRows);
+ UNIT_ASSERT_VALUES_EQUAL(transformedRows.size(), 2);
+ UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished());
+ transformedRows.clear();
+ }
+
+private:
+ THolder<NActors::TTestActorRuntimeBase> ActorRuntime_;
+
+ std::unique_ptr<NMiniKQL::TScopedAlloc> Alloc;
+ std::unique_ptr<NMiniKQL::TTypeEnvironment> TypeEnv;
+
+ TIntrusivePtr<IFunctionRegistry> FunctionRegistry;
+ NMiniKQL::TProgramBuilder ProgramBuilder;
+ NMiniKQL::TMemoryUsageInfo MemInfo;
+ std::unique_ptr<NMiniKQL::THolderFactory> HolderFactory;
+
+ std::unique_ptr<TDefaultValueBuilder> ValueBuilder;
+ IHTTPMockGateway::TPtr Gateway;
+};
+
+UNIT_TEST_SUITE_REGISTRATION(TCloudFunctionTransformTest)
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt
new file mode 100644
index 0000000000..d3fffe049d
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt
@@ -0,0 +1,55 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-yql-providers-cloud_function-actors-ut)
+target_compile_options(library-yql-providers-cloud_function-actors-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(library-yql-providers-cloud_function-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors
+)
+target_link_libraries(library-yql-providers-cloud_function-actors-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-cloud_function-actors
+ cpp-actors-testlib
+ yql-minikql-invoke_builtins
+ udf-service-exception_policy
+ yql-minikql-computation
+ providers-common-http_gateway
+ common-http_gateway-mock
+)
+target_sources(library-yql-providers-cloud_function-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
+)
+add_test(
+ NAME
+ library-yql-providers-cloud_function-actors-ut
+ COMMAND
+ library-yql-providers-cloud_function-actors-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+target_link_flags(library-yql-providers-cloud_function-actors-ut
+ PUBLIC
+ -Wl,-no_deduplicate
+ -Wl,-sdk_version,10.15
+ -fPIC
+ -fPIC
+ -framework
+ CoreFoundation
+ -framework
+ CoreFoundation
+)
+vcs_info(library-yql-providers-cloud_function-actors-ut)
diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt
new file mode 100644
index 0000000000..c159a94b38
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt
@@ -0,0 +1,56 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(library-yql-providers-cloud_function-actors-ut)
+target_compile_options(library-yql-providers-cloud_function-actors-ut PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(library-yql-providers-cloud_function-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors
+)
+target_link_libraries(library-yql-providers-cloud_function-actors-ut PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-lfalloc
+ library-cpp-cpuid_check
+ cpp-testing-unittest_main
+ providers-cloud_function-actors
+ cpp-actors-testlib
+ yql-minikql-invoke_builtins
+ udf-service-exception_policy
+ yql-minikql-computation
+ providers-common-http_gateway
+ common-http_gateway-mock
+)
+target_sources(library-yql-providers-cloud_function-actors-ut PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp
+)
+add_test(
+ NAME
+ library-yql-providers-cloud_function-actors-ut
+ COMMAND
+ library-yql-providers-cloud_function-actors-ut
+ --print-before-suite
+ --print-before-test
+ --fork-tests
+ --print-times
+ --show-fails
+)
+target_link_flags(library-yql-providers-cloud_function-actors-ut
+ PUBLIC
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+vcs_info(library-yql-providers-cloud_function-actors-ut)
diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt
new file mode 100644
index 0000000000..a681d385f3
--- /dev/null
+++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt
@@ -0,0 +1,13 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (APPLE)
+ include(CMakeLists.darwin.txt)
+elseif (UNIX)
+ include(CMakeLists.linux.txt)
+endif()
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt b/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt
new file mode 100644
index 0000000000..2926e538fd
--- /dev/null
+++ b/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt
@@ -0,0 +1,21 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(common-http_gateway-mock)
+target_compile_options(common-http_gateway-mock PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(common-http_gateway-mock PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ providers-common-http_gateway
+)
+target_sources(common-http_gateway-mock PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
+)
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
new file mode 100644
index 0000000000..194bf2c6e7
--- /dev/null
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp
@@ -0,0 +1,92 @@
+#include "yql_http_mock_gateway.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NYql {
+namespace {
+
+using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString>;
+
+class TKeyHash {
+public:
+ TKeyHash() : Hash() {}
+
+ size_t operator()(const TKeyType& key) const {
+ const auto& headers = std::get<1U>(key);
+ auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key)));
+ return std::accumulate(headers.cbegin(), headers.cend(), initHash,
+ [this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); });
+ }
+public:
+ const std::hash<TString> Hash;
+};
+
+class THTTPMockGateway : public IHTTPMockGateway {
+friend class IHTTPMockGateway;
+public:
+ using TPtr = std::shared_ptr<THTTPMockGateway>;
+
+ THTTPMockGateway() {
+ }
+
+ ~THTTPMockGateway() {
+ }
+
+ void Download(
+ TString url,
+ IHTTPGateway::THeaders headers,
+ std::size_t expectedSize,
+ IHTTPGateway::TOnResult callback,
+ TString data,
+ IRetryPolicy<long>::TPtr retryPolicy)
+ {
+
+ Y_UNUSED(expectedSize);
+ Y_UNUSED(retryPolicy);
+
+ auto key = TKeyType(url, headers, data);
+ if (RequestsResponse.contains(key)) {
+ for (auto response : RequestsResponse[key]) {
+ callback(response());
+ }
+ } else if (DefaultResponse) {
+ callback(DefaultResponse(url, headers, data));
+ } else {
+ YQL_ENSURE(false, "There isn't any response callback at url " + url);
+ }
+ }
+
+ virtual void Download(
+ TString ,
+ IHTTPGateway::THeaders ,
+ std::size_t ,
+ IHTTPGateway::TOnNewDataPart ,
+ IHTTPGateway::TOnDowloadFinsh ) {
+ }
+
+ void AddDefaultResponse(TDataDefaultResponse response) {
+ DefaultResponse = response;
+ }
+
+ void AddDownloadResponse(
+ TString url,
+ IHTTPGateway::THeaders headers,
+ TString data,
+ TDataResponse response) {
+
+ auto& entry = RequestsResponse[TKeyType(url, headers, data)];
+ entry.emplace_back(std::move(response));
+ }
+
+private:
+ std::unordered_map<TKeyType, std::vector<TDataResponse>, TKeyHash> RequestsResponse;
+ TDataDefaultResponse DefaultResponse;
+};
+}
+
+
+IHTTPMockGateway::TPtr IHTTPMockGateway::Make() {
+ return std::make_shared<THTTPMockGateway>();
+}
+
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h
new file mode 100644
index 0000000000..447d608384
--- /dev/null
+++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h
@@ -0,0 +1,25 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
+
+namespace NYql {
+class IHTTPMockGateway : public IHTTPGateway {
+public:
+ using TPtr = std::shared_ptr<IHTTPMockGateway>;
+
+ virtual ~IHTTPMockGateway() = default;
+
+ static TPtr Make();
+
+ using TDataDefaultResponse = std::function<IHTTPGateway::TResult(TString, IHTTPGateway::THeaders, TString)>;
+ using TDataResponse = std::function<IHTTPGateway::TResult()>;
+
+ virtual void AddDefaultResponse(TDataDefaultResponse responseCallback) = 0;
+
+ virtual void AddDownloadResponse(
+ TString url,
+ IHTTPGateway::THeaders headers,
+ TString data,
+ TDataResponse responseCallback) = 0;
+};
+} \ No newline at end of file
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
index ffe7a3e6b3..44f076536a 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp
@@ -2,6 +2,7 @@
#include <contrib/libs/curl/include/curl/curl.h>
#include <util/stream/str.h>
+#include <util/string/builder.h>
#include <util/generic/size_literals.h>
#include <thread>
@@ -112,12 +113,15 @@ private:
if (CURLE_OK != result)
return Fail(TIssue(curl_easy_strerror(result)));
+ long httpResponseCode = 0;
+ curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode);
+
const std::unique_lock lock(SyncCallbacks);
while (!Callbacks.empty()) {
if (1U == Callbacks.size())
- Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer)));
+ Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode));
else
- Callbacks.top()(IHTTPGateway::TContent(Buffer));
+ Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode));
Callbacks.pop();
}
}
@@ -518,22 +522,32 @@ private:
}
-IHTTPGateway::TContent::TContent(TString&& data)
+IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode)
: TString(std::move(data))
+ , HttpResponseCode(httpResponseCode)
{
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_add(size());
}
}
-IHTTPGateway::TContent::TContent(const TString& data)
+IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode)
: TString(data)
+ , HttpResponseCode(httpResponseCode)
{
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_add(size());
}
}
+IHTTPGateway::TContent::TContent(TString&& data)
+ : TContent(std::move(data), 0)
+{}
+
+IHTTPGateway::TContent::TContent(const TString& data)
+ : TContent(data, 0)
+{}
+
TString IHTTPGateway::TContent::Extract() {
if (!empty()) {
THTTPMultiGateway::OutputSize.fetch_sub(size());
diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
index 991f0b2768..8872270a09 100644
--- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
+++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h
@@ -28,6 +28,8 @@ public:
class TContent : private TString {
friend class TEasyCurl;
public:
+ TContent(TString&& data, long httpResponseCode);
+ TContent(const TString& data, long httpResponseCode);
TContent(TString&& data);
TContent(const TString& data);
@@ -44,6 +46,9 @@ public:
private:
TContent(const TContent&) = delete;
TContent& operator=(const TContent&) = delete;
+
+ public:
+ long HttpResponseCode;
};
using TResult = std::variant<TContent, TIssues>;
diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
index ec404c0a6d..9531a0204b 100644
--- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp
+++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp
@@ -359,10 +359,6 @@ namespace NYql::NDqs {
FillOutputDesc(*taskDesc.AddOutputs(), output);
}
- auto& transform = *taskDesc.MutableOutputTransform();
- transform.SetType(task.OutputTransform.Type);
- transform.SetFunctionName(task.OutputTransform.FunctionName);
-
auto& program = *taskDesc.MutableProgram();
program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0);
TString programStr;
@@ -479,9 +475,11 @@ namespace NYql::NDqs {
task.Inputs[dqSourceInputIndex].SourceSettings = sourceSettings;
task.Inputs[dqSourceInputIndex].SourceType = sourceType;
}
- auto& transform = task.OutputTransform;
- transform.Type = stageSettings.TransformType;
- transform.FunctionName = stageSettings.TransformName;
+ if (stageSettings.IsExternalFunction) {
+ auto& transform = task.OutputTransform;
+ transform.Type = stageSettings.TransformType;
+ transform.FunctionName = stageSettings.TransformName;
+ }
}
}
return !parts.empty();