diff options
author | hrustyashko <hrustyashko@yandex-team.ru> | 2022-03-25 20:59:44 +0300 |
---|---|---|
committer | hrustyashko <hrustyashko@yandex-team.ru> | 2022-03-25 20:59:44 +0300 |
commit | 25ded0237971981b125ad435b3845969585a2ad7 (patch) | |
tree | c4009b93b45d4f380d19aa37629615fc0a8bab43 | |
parent | 67e2ad960068f40e2dbcbce84908b06b91603a19 (diff) | |
download | ydb-25ded0237971981b125ad435b3845969585a2ad7.tar.gz |
Actor для вызова трансформации через Cloud Function
ref:58a020f15a89ef366baf4da257052687121d2101
30 files changed, 1165 insertions, 17 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index e3decdfe93..26d76167eb 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -1192,12 +1192,14 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut) add_subdirectory(ydb/core/yq/libs/result_formatter/ut) add_subdirectory(ydb/core/yq/libs/signer/ut) add_subdirectory(ydb/core/yq/libs/test_connection/ut) +add_subdirectory(ydb/library/yql/dq/actors/transform) add_subdirectory(ydb/library/yql/dq/actors/compute/ut) add_subdirectory(ydb/library/yql/dq/runtime/ut) add_subdirectory(ydb/library/yql/dq/state/ut) add_subdirectory(ydb/library/yql/parser/pg_catalog/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) +add_subdirectory(ydb/library/yql/providers/cloud_function/actors) add_subdirectory(ydb/library/yql/public/decimal/ut) add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) @@ -1214,6 +1216,8 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl) add_subdirectory(ydb/library/yql/providers/common/codec/ut) add_subdirectory(ydb/library/yql/providers/common/structured_token/ut) add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy) +add_subdirectory(ydb/library/yql/providers/cloud_function/actors/ut) +add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock) add_subdirectory(ydb/library/yql/udfs/common/stat/ut) add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index 2b096d75ca..a80add5dfc 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -1287,12 +1287,14 @@ add_subdirectory(ydb/core/yq/libs/hmac/ut) add_subdirectory(ydb/core/yq/libs/result_formatter/ut) add_subdirectory(ydb/core/yq/libs/signer/ut) add_subdirectory(ydb/core/yq/libs/test_connection/ut) +add_subdirectory(ydb/library/yql/dq/actors/transform) add_subdirectory(ydb/library/yql/dq/actors/compute/ut) add_subdirectory(ydb/library/yql/dq/runtime/ut) add_subdirectory(ydb/library/yql/dq/state/ut) add_subdirectory(ydb/library/yql/parser/pg_catalog/ut) add_subdirectory(ydb/library/yql/providers/common/schema) add_subdirectory(ydb/library/yql/providers/common/schema/skiff) +add_subdirectory(ydb/library/yql/providers/cloud_function/actors) add_subdirectory(ydb/library/yql/public/decimal/ut) add_subdirectory(ydb/library/yql/public/issue/ut) add_subdirectory(ydb/library/yql/public/udf/ut) @@ -1309,6 +1311,8 @@ add_subdirectory(ydb/public/sdk/cpp/examples/ttl) add_subdirectory(ydb/library/yql/providers/common/codec/ut) add_subdirectory(ydb/library/yql/providers/common/structured_token/ut) add_subdirectory(ydb/library/yql/providers/pq/gateway/dummy) +add_subdirectory(ydb/library/yql/providers/cloud_function/actors/ut) +add_subdirectory(ydb/library/yql/providers/common/http_gateway/mock) add_subdirectory(ydb/library/yql/udfs/common/stat/ut) add_subdirectory(ydb/library/yql/udfs/common/topfreq/ut) add_subdirectory(ydb/public/sdk/cpp/client/extensions/discovery_mutator/ut) diff --git a/ydb/core/protos/services.proto b/ydb/core/protos/services.proto index 76fd4b5979..72b8384cc3 100644 --- a/ydb/core/protos/services.proto +++ b/ydb/core/protos/services.proto @@ -267,6 +267,8 @@ enum EServiceKikimr { YQL_NODES_MANAGER = 715; + YANDEX_CLOUD_FUNCTION = 716; + // Exports (& imports) EXPORT = 800; DATASHARD_RESTORE = 801; diff --git a/ydb/library/yql/dq/actors/dq_events_ids.h b/ydb/library/yql/dq/actors/dq_events_ids.h index 2049edee75..1022c93201 100644 --- a/ydb/library/yql/dq/actors/dq_events_ids.h +++ b/ydb/library/yql/dq/actors/dq_events_ids.h @@ -50,6 +50,8 @@ struct TDqComputeEvents { EvNewCheckpointCoordinatorAck, EvNewSourceDataArrived, EvSourceError, + EvTransformNewData, + EvTransformCompleted, // place all new events here EvEnd diff --git a/ydb/library/yql/dq/actors/transform/CMakeLists.txt b/ydb/library/yql/dq/actors/transform/CMakeLists.txt new file mode 100644 index 0000000000..4dc0fe6ead --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/CMakeLists.txt @@ -0,0 +1,24 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(dq-actors-transform) +target_compile_options(dq-actors-transform PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(dq-actors-transform PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + yql-dq-runtime + yql-dq-proto +) +target_sources(dq-actors-transform PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp +) diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor.h new file mode 100644 index 0000000000..0f8e1a6394 --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor.h @@ -0,0 +1,11 @@ +#pragma once + +namespace NYql::NDq { + +class IDqTransformActor { +public: + virtual void DoTransform() = 0; + virtual ~IDqTransformActor() = default; +}; + +}
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp new file mode 100644 index 0000000000..f7bed20f83 --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.cpp @@ -0,0 +1,30 @@ +#include "dq_transform_actor_factory.h" + +#include <ydb/library/yql/utils/yql_panic.h> +#include <google/protobuf/text_format.h> + +namespace NYql::NDq { + +TDqTransformActorFactory::TDqTransformActorFactory() +{} + +std::pair<IDqTransformActor*, NActors::IActor*> TDqTransformActorFactory::CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args) { + auto creator = CreatorsByType.find(transform.GetType()); + if (creator == CreatorsByType.end()) { + const google::protobuf::EnumDescriptor* descriptor = NDqProto::ETransformType_descriptor(); + std::string transformName = descriptor->FindValueByNumber(transform.GetType())->name(); + YQL_ENSURE(false, "Unregistered type of transform actor: \"" << transformName << "\""); + } + + std::pair<IDqTransformActor*, NActors::IActor*> actor = (creator->second)(transform, std::move(args)); + Y_VERIFY(actor.first); + Y_VERIFY(actor.second); + return actor; +} + +void TDqTransformActorFactory::Register(NDqProto::ETransformType type, TTransformCreator creator) { + auto [_, registered] = CreatorsByType.emplace(type, std::move(creator)); + Y_VERIFY(registered); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h new file mode 100644 index 0000000000..2b57e288ff --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h @@ -0,0 +1,39 @@ +#pragma once + +#include "dq_transform_actor.h" + +#include <ydb/library/yql/dq/common/dq_common.h> +#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> +#include <ydb/library/yql/dq/runtime/dq_output_consumer.h> +#include <ydb/library/yql/dq/runtime/dq_output_channel.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <library/cpp/actors/core/actor.h> + +namespace NYql::NDq { + +class TDqTransformActorFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<TDqTransformActorFactory>; + + struct TArguments { + const NActors::TActorId ComputeActorId; + const IDqOutputChannel::TPtr TransformInput; + const IDqOutputConsumer::TPtr TransformOutput; + const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; + NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; + }; + + TDqTransformActorFactory(); + + using TTransformCreator = std::function<std::pair<IDqTransformActor*, NActors::IActor*>( + const NDqProto::TDqTransform& transform, TArguments&& args)>; + + std::pair<IDqTransformActor*, NActors::IActor*> CreateDqTransformActor(const NDqProto::TDqTransform& transform, TArguments&& args); + void Register(NDqProto::ETransformType type, TTransformCreator creator); + +private: + std::unordered_map<NDqProto::ETransformType, TTransformCreator> CreatorsByType; +}; +}
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp b/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp new file mode 100644 index 0000000000..b48f009e34 --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/dq_transform_events.cpp @@ -0,0 +1 @@ +#include "dq_transform_events.h"
\ No newline at end of file diff --git a/ydb/library/yql/dq/actors/transform/dq_transform_events.h b/ydb/library/yql/dq/actors/transform/dq_transform_events.h new file mode 100644 index 0000000000..a41c79480e --- /dev/null +++ b/ydb/library/yql/dq/actors/transform/dq_transform_events.h @@ -0,0 +1,23 @@ +#pragma once + +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/dq/actors/dq_events_ids.h> + +#include <library/cpp/actors/core/events.h> +#include <library/cpp/actors/core/event_local.h> + +namespace NYql::NDq { +using namespace NActors; + +namespace NTransformActor { + struct TEvTransformNewData : public TEventLocal<TEvTransformNewData, TDqComputeEvents::EvTransformNewData> { + TEvTransformNewData() {} + }; + + struct TEvTransformCompleted : public TEventLocal<TEvTransformCompleted, TDqComputeEvents::EvTransformCompleted> { + TEvTransformCompleted() {} + }; + +} // namespace NTransformActor + +}
\ No newline at end of file diff --git a/ydb/library/yql/dq/opt/dq_opt.cpp b/ydb/library/yql/dq/opt/dq_opt.cpp index f60f29dd46..45175199fb 100644 --- a/ydb/library/yql/dq/opt/dq_opt.cpp +++ b/ydb/library/yql/dq/opt/dq_opt.cpp @@ -29,7 +29,7 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) { } else if (name == TransformTypeSetting) { YQL_ENSURE(tuple.Value().Maybe<TCoAtom>()); if (const auto type = tuple.Value().Cast<TCoAtom>().Value(); type == "YANDEX-CLOUD") { - settings.TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD; + settings.TransformType = NDqProto::TRANSFORM_CLOUD_FUNCTION; } else { YQL_ENSURE(false, "Unknown transform type: " << type); } diff --git a/ydb/library/yql/dq/opt/dq_opt.h b/ydb/library/yql/dq/opt/dq_opt.h index 5b3146d0f4..cbb0714aa4 100644 --- a/ydb/library/yql/dq/opt/dq_opt.h +++ b/ydb/library/yql/dq/opt/dq_opt.h @@ -22,7 +22,7 @@ struct TDqStageSettings { bool SinglePartition = false; bool IsExternalFunction = false; - NDqProto::ETransformType TransformType = NDqProto::TRANSFORM_YANDEX_CLOUD; + NDqProto::ETransformType TransformType = NDqProto::TRANSFORM_CLOUD_FUNCTION; TString TransformName; ui32 TransformConcurrency = 0; diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index e29e64ef2c..36d73316fe 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -130,13 +130,18 @@ message TTaskOutput { } enum ETransformType { - TRANSFORM_YANDEX_CLOUD = 0; + TRANSFORM_UNSPECIFIED = 0; + TRANSFORM_CLOUD_FUNCTION = 1; } message TDqTransform { ETransformType Type = 1; string FunctionName = 2; string ConnectionName = 3; + + // binary YSON(codec ydb/library/yql/providers/common/schema/mkql) + string InputType = 4; + string OutputType = 5; } message TDqTask { diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp index 211d4ac39a..e543e1722b 100644 --- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp +++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp @@ -280,6 +280,7 @@ public: ui64 Drop() override { // Drop channel data because channel was finished. Leave checkpoint because checkpoints keep going through channel after finishing channel data transfer. ui64 rows = Data.size(); Data.clear(); + TDataType().swap(Data); return rows; } @@ -757,6 +758,8 @@ public: DataTail.clear(); SizeTail.clear(); MemoryUsed = 0; + TDataType().swap(DataHead); + TDataType().swap(DataTail); // todo: send remove request SpilledRows = 0; SpilledBlobs.clear(); diff --git a/ydb/library/yql/dq/tasks/dq_connection_builder.h b/ydb/library/yql/dq/tasks/dq_connection_builder.h index 2c46061f60..4f4cd8189b 100644 --- a/ydb/library/yql/dq/tasks/dq_connection_builder.h +++ b/ydb/library/yql/dq/tasks/dq_connection_builder.h @@ -47,9 +47,11 @@ void CommonBuildTasks(TDqTasksGraph<TStageInfoMeta, TTaskMeta, TInputMeta, TOutp for (ui32 i = 0; i < partitionsCount; ++i) { auto& task = graph.AddTask(stageInfo); - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - transform.FunctionName = stageSettings.TransformName; + if (stageSettings.IsExternalFunction) { + auto& transform = task.OutputTransform; + transform.Type = stageSettings.TransformType; + transform.FunctionName = stageSettings.TransformName; + } } } diff --git a/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt new file mode 100644 index 0000000000..61a6a9637f --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/CMakeLists.txt @@ -0,0 +1,33 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(providers-cloud_function-actors) +target_compile_options(providers-cloud_function-actors PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(providers-cloud_function-actors PUBLIC + contrib-libs-cxxsupp + yutil + cpp-actors-core + library-cpp-retry + yql-dq-runtime + yql-dq-proto + dq-actors-transform + providers-common-http_gateway + providers-common-codec + common-schema-mkql + library-yql-minikql + yql-minikql-computation + yql-public-issue + yql-sql-pg_dummy +) +target_sources(providers-cloud_function-actors PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp +) diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp new file mode 100644 index 0000000000..dc4534c9db --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.cpp @@ -0,0 +1,317 @@ +#include "cloud_function_transform.h" + +#include <ydb/core/protos/services.pb.h> + +#include <ydb/library/yql/providers/common/codec/yql_json_codec.h> +#include <ydb/library/yql/minikql/mkql_type_ops.h> +#include <ydb/library/yql/minikql/mkql_node.h> +#include <ydb/library/yql/public/issue/yql_issue.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> + +#include <library/cpp/string_utils/base64/base64.h> +#include <library/cpp/string_utils/quote/quote.h> +#include <library/cpp/retry/retry_policy.h> +#include <library/cpp/actors/core/log.h> + +#define LOG_D(s) \ + LOG_DEBUG_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s) +#define LOG_W(s) \ + LOG_WARN_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s) +#define LOG_E(s) \ + LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::YANDEX_CLOUD_FUNCTION, "SelfId: " << this->SelfId() << ", Transform: " << TransformName << ". " << s) + + +namespace NYql::NDq { + +using namespace NActors; +using namespace NCommon; +using namespace NKikimr; + +namespace { + +constexpr TStringBuf CLOUD_FUNCTION_BASE_URL = "https://functions.yandexcloud.net/"; + +constexpr size_t MAX_RETRY = 3; + +ERetryErrorClass RetryByHttpCode(long httpResponseCode) { + switch (httpResponseCode) { + case 429: + return ERetryErrorClass::LongRetry; + case 500: + case 503: + case 504: + return ERetryErrorClass::ShortRetry; + } + return ERetryErrorClass::NoRetry; +} + +const auto RETRY_POLICY = IRetryPolicy<long>::GetExponentialBackoffPolicy(RetryByHttpCode, + TDuration::MilliSeconds(100), + TDuration::MilliSeconds(400), + TDuration::Seconds(5), + MAX_RETRY); + +} + +TCloudFunctionTransformActor::TCloudFunctionTransformActor(TActorId owner, + NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway, + IDqOutputChannel::TPtr transformInput, + IDqOutputConsumer::TPtr taskOutput, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + NKikimr::NMiniKQL::TProgramBuilder& programBuilder) +: TActor(&TCloudFunctionTransformActor::StateFunc) +, Owner(owner) +, Transform(transform) +, Gateway(std::move(gateway)) +, TransformInput(std::move(transformInput)) +, TaskOutput(std::move(taskOutput)) +, TransformName(UrlEscapeRet(transform.GetFunctionName(), true)) +, HolderFactory(holderFactory) +, TypeEnv(typeEnv) +, ProgramBuilder(programBuilder) +{ + TStringStream err; + InputRowType = ParseTypeFromYson(TStringBuf{transform.GetInputType()}, ProgramBuilder, err); + YQL_ENSURE(InputRowType, "Can't parse cloud function input type"); + + NMiniKQL::TType* outputType = ParseTypeFromYson(TStringBuf{transform.GetOutputType()}, ProgramBuilder, err); + YQL_ENSURE(outputType, "Can't parse cloud function output type"); + OutputRowsType = NKikimr::NMiniKQL::TListType::Create(outputType, TypeEnv); +} + + +STRICT_STFUNC(TCloudFunctionTransformActor::StateFunc, + hFunc(TEvents::TEvPoison, HandlePoison); + hFunc(TCFTransformEvent::TEvTransformSuccess, Handle); + hFunc(TCFTransformEvent::TEvExecuteTransform, Handle); + hFunc(TEvDq::TEvAbortExecution, Handle); +) + +STATEFN(TCloudFunctionTransformActor::DeadState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvents::TEvPoison, HandlePoison); + } +} + +void TCloudFunctionTransformActor::Handle(TEvDq::TEvAbortExecution::TPtr& ev) { + auto& msg = ev->Get()->Record; + TIssues issues = ev->Get()->GetIssues(); + if (msg.GetStatusCode() == Ydb::StatusIds::INTERNAL_ERROR) { + InternalError(issues); + } else if (msg.GetStatusCode() == Ydb::StatusIds::ABORTED) { + Aborted(issues); + } else { + RuntimeError(issues); + } +} + +void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvExecuteTransform::TPtr& ev) { + Y_UNUSED(ev); + + auto guard = BindAllocator(); + + TString payload; + TStringOutput jsonStream{payload}; + NJson::TJsonWriter jsonWriter{&jsonStream, false}; + + jsonWriter.OpenArray(); + + bool hasPayload = false; + while (TransformInput->HasData() && !TaskOutput->IsFull()) { + NKikimr::NMiniKQL::TUnboxedValueVector rowsToTransform; + if (TransformInput->PopAll(rowsToTransform)) { + hasPayload = true; + for (auto&& row : rowsToTransform) { + try { + NJsonCodec::WriteValueToJson(jsonWriter, row, InputRowType, NJsonCodec::DefaultPolicy::getInstance().CloudFunction()); + } catch (const std::exception& ex) { + auto message = TStringBuilder() + << "Failed to convert data to input json for cloud function '" + << TransformName << "': " << ex.what(); + RuntimeError(message); + return; + } + } + } + } + jsonWriter.CloseArray(); + jsonWriter.Flush(); + + TCFReqContext context; + if (!TransformInput->HasData() && TransformInput->IsFinished()) { + context.LastBatch = true; + } + + if (hasPayload) { + TransformInProgress = true; + + auto* actorSystem = TActivationContext::ActorSystem(); + auto headers = IHTTPGateway::THeaders(); + Gateway->Download(CLOUD_FUNCTION_BASE_URL + TransformName, headers, 10, + std::bind(&TCloudFunctionTransformActor::OnInvokeFinished, actorSystem, SelfId(), + context, + std::placeholders::_1), + payload, RETRY_POLICY); + } else if (context.LastBatch) { + CompleteTransform(); + } +} + +void TCloudFunctionTransformActor::Handle(TCFTransformEvent::TEvTransformSuccess::TPtr& ev) { + auto guard = BindAllocator(); + + NKikimr::NUdf::TUnboxedValue transformedData; + try { + auto body = std::string_view(ev->Get()->Result); + NJson::TJsonValue bodyJson; + if (!NJson::ReadJsonTree(body, &bodyJson, false)) { + YQL_ENSURE(false, "Invalid json"); + } + transformedData = NJsonCodec::ReadJsonValue(bodyJson, OutputRowsType, HolderFactory); + } catch (const std::exception& ex) { + auto message = TStringBuilder() + << "Failed to convert output json from cloud function '" + << TransformName << "': " << ex.what(); + RuntimeError(message); + return; + } + + TransformInProgress = false; + + //if (!TaskOutput->IsFull()) { + TaskOutput->Consume(std::move(transformedData)); + auto newDataEv = new NTransformActor::TEvTransformNewData(); + Send(Owner, newDataEv); + //} + + if (ev->Get()->LastBatch) { + CompleteTransform(); + } +} + +void TCloudFunctionTransformActor::DoTransform() { + if (TransformInProgress) + return; + + auto executeEv = new TCFTransformEvent::TEvExecuteTransform(); + Send(SelfId(), executeEv); +} + +void TCloudFunctionTransformActor::OnInvokeFinished(TActorSystem* actorSystem, TActorId selfId, + TCFReqContext reqContext, + IHTTPGateway::TResult&& result) { + switch (result.index()) { + case 0U: { + auto response = std::get<IHTTPGateway::TContent>(std::move(result)); + switch (response.HttpResponseCode) { + case 200: { + auto event = new TCFTransformEvent::TEvTransformSuccess(std::move(response), reqContext.LastBatch); + actorSystem->Send(new IEventHandle(selfId, TActorId(), event)); + break; + } + case 403: + case 404: { + auto transformName = reqContext.TransformName; + auto message = TStringBuilder() << "Got '" << response.HttpResponseCode << "' response code from cloud function '" + << transformName << "'"; + auto ev = TEvDq::TEvAbortExecution::Unavailable(message); + actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release())); + break; + } + case 429: + case 500: + case 503: + case 504: { + auto transformName = reqContext.TransformName; + auto message = TStringBuilder() << "Got '" << response.HttpResponseCode << "' response code from cloud function '" + << transformName << "' and retry has been out"; + auto ev = TEvDq::TEvAbortExecution::Aborted(message); + actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release())); + break; + } + default: + auto transformName = reqContext.TransformName; + auto message = TStringBuilder() << "Got unexpected response code (" << response.HttpResponseCode + << ") from cloud function '" << transformName; + auto ev = TEvDq::TEvAbortExecution::Aborted(message); + actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release())); + } + break; + } + case 1U: + auto transformName = reqContext.TransformName; + auto message = TStringBuilder() << "Internal error during call cloud function '" << transformName; + auto ev = TEvDq::TEvAbortExecution::InternalError(message, std::get<TIssues>(result)); + actorSystem->Send(new IEventHandle(selfId, TActorId(), ev.Release())); + break; + } +} + + +void TCloudFunctionTransformActor::HandlePoison(NActors::TEvents::TEvPoison::TPtr&) { + PassAway(); +} + +void TCloudFunctionTransformActor::RuntimeError(const TString& message, const TIssues& subIssues) { + LOG_E(message); + RuntimeError(TEvDq::TEvAbortExecution::Unavailable(message, subIssues)->GetIssues()); +} + +void TCloudFunctionTransformActor::RuntimeError(const TIssues& issues) { + TransformInProgress = false; + auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::UNAVAILABLE, issues); + Send(Owner, ev.Release()); + + Become(&TCloudFunctionTransformActor::DeadState); +} + +void TCloudFunctionTransformActor::InternalError(const TString& message, const TIssues& subIssues) { + LOG_E(message); + InternalError(TEvDq::TEvAbortExecution::InternalError(message, subIssues)->GetIssues()); +} + +void TCloudFunctionTransformActor::InternalError(const TIssues& issues) { + TransformInProgress = false; + auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::INTERNAL_ERROR, issues); + Send(Owner, ev.Release()); + + Become(&TCloudFunctionTransformActor::DeadState); +} + +void TCloudFunctionTransformActor::Aborted(const TString& message, const TIssues& subIssues) { + LOG_E(message); + Aborted(TEvDq::TEvAbortExecution::Aborted(message, subIssues)->GetIssues()); +} + +void TCloudFunctionTransformActor::Aborted(const TIssues& issues) { + TransformInProgress = false; + auto ev = MakeHolder<TEvDq::TEvAbortExecution>(Ydb::StatusIds::ABORTED, issues); + Send(Owner, ev.Release()); + + Become(&TCloudFunctionTransformActor::DeadState); +} + + +void TCloudFunctionTransformActor::CompleteTransform() { + TransformInProgress = false; + TaskOutput->Finish(); + + auto completeEv = new NTransformActor::TEvTransformCompleted(); + Send(Owner, completeEv); +} + +TGuard<NKikimr::NMiniKQL::TScopedAlloc> TCloudFunctionTransformActor::BindAllocator() { + return TypeEnv.BindAllocator(); +} + +std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args) { + const auto actor = new TCloudFunctionTransformActor( + args.ComputeActorId, transform, gateway, + args.TransformInput, args.TransformOutput, + args.HolderFactory, args.TypeEnv, + args.ProgramBuilder); + return {actor, actor}; +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h new file mode 100644 index 0000000000..85c9b9f073 --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform.h @@ -0,0 +1,122 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/dq.h> +#include <ydb/library/yql/dq/actors/transform/dq_transform_actor.h> +#include <ydb/library/yql/dq/actors/transform/dq_transform_events.h> +#include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h> +#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> +#include <ydb/library/yql/dq/runtime/dq_output_consumer.h> +#include <ydb/library/yql/dq/runtime/dq_input_channel.h> +#include <ydb/library/yql/dq/runtime/dq_output_channel.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> + +#include <library/cpp/actors/core/actor_bootstrapped.h> +#include <library/cpp/actors/core/hfunc.h> +#include <library/cpp/actors/core/actor.h> + +namespace NYql::NDq { + +using namespace NActors; +using namespace NKikimr; + +class TCloudFunctionTransformActor : public NActors::TActor<TCloudFunctionTransformActor> + , public IDqTransformActor + , public TThrRefBase +{ +public: + struct TCFTransformEvent { + enum { + EV_BEGIN = EventSpaceBegin(TEvents::ES_PRIVATE), + EV_TRANSFORM_SUCCESS = EV_BEGIN, + EV_EXECUTE_TRANSFORM + }; + + struct TEvTransformSuccess : public TEventLocal<TEvTransformSuccess, EV_TRANSFORM_SUCCESS> { + TEvTransformSuccess(IHTTPGateway::TContent&& result, bool lastBatch) + : Result(std::move(result)) + , LastBatch(lastBatch) + {} + + IHTTPGateway::TContent Result; + bool LastBatch = false; + }; + + struct TEvExecuteTransform : public TEventLocal<TEvExecuteTransform, EV_EXECUTE_TRANSFORM> { + TEvExecuteTransform() {} + }; + }; + +public: + using TPtr = TIntrusivePtr<TCloudFunctionTransformActor>; + + static constexpr char ActorName[] = "YQL_DQ_CLOUD_FUNC_TRANSFORM"; + + TCloudFunctionTransformActor(TActorId owner, + NDqProto::TDqTransform transform, IHTTPGateway::TPtr gateway, + IDqOutputChannel::TPtr transformInput, + IDqOutputConsumer::TPtr taskOutput, + const NKikimr::NMiniKQL::THolderFactory& holderFactory, + const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv, + NKikimr::NMiniKQL::TProgramBuilder& programBuilder); + void DoTransform(); + +private: + STATEFN(StateFunc); + void Handle(TCFTransformEvent::TEvTransformSuccess::TPtr& ev); + void Handle(TCFTransformEvent::TEvExecuteTransform::TPtr& ev); + void Handle(TEvDq::TEvAbortExecution::TPtr& ev); + + STATEFN(DeadState); + void HandlePoison(NActors::TEvents::TEvPoison::TPtr&); + +private: + struct TCFReqContext { + TCFReqContext() {} + TCFReqContext(bool lastBatch): LastBatch(lastBatch) {} + + TString TransformName; + bool LastBatch = false; + }; + + static void OnInvokeFinished(TActorSystem* actorSystem, TActorId selfId, TCFReqContext reqContext, IHTTPGateway::TResult&& result); + + void InternalError(const TIssues& issues); + void InternalError(const TString& message, const TIssues& subIssues = {}); + + void RuntimeError(const TIssues& issues); + void RuntimeError(const TString& message, const TIssues& subIssues = {}); + + void Aborted(const TIssues& issues); + void Aborted(const TString& message, const TIssues& subIssues = {}); + + void CompleteTransform(); + +public: + TGuard<NKikimr::NMiniKQL::TScopedAlloc> BindAllocator(); + +private: + NActors::TActorId Owner; // compute actor Id + + NDqProto::TDqTransform Transform; + IHTTPGateway::TPtr Gateway; + + IDqOutputChannel::TPtr TransformInput; + IDqOutputConsumer::TPtr TaskOutput; + + TString TransformName; + + bool TransformInProgress = false; + + const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; + NKikimr::NMiniKQL::TProgramBuilder& ProgramBuilder; + + NMiniKQL::TType* InputRowType = nullptr; + NMiniKQL::TType* OutputRowsType = nullptr; +}; + +std::pair<IDqTransformActor*, NActors::IActor*> CreateCloudFunctionTransformActor(const NDqProto::TDqTransform& transform, IHTTPGateway::TPtr gateway, TDqTransformActorFactory::TArguments&& args); + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp new file mode 100644 index 0000000000..ab4d33e308 --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.cpp @@ -0,0 +1,15 @@ +#include "cloud_function_transform_factory.h" +#include "cloud_function_transform.h" + +#include <ydb/library/yql/dq/proto/dq_tasks.pb.h> + +namespace NYql::NDq { + +void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway) { + constexpr NDqProto::ETransformType type = NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION; + factory.Register(type, [gateway](const NDqProto::TDqTransform& transform, TDqTransformActorFactory::TArguments&& args) { + return CreateCloudFunctionTransformActor(transform, gateway, std::move(args)); + }); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h new file mode 100644 index 0000000000..d17f556c73 --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_factory.h @@ -0,0 +1,10 @@ +#pragma once + +#include <ydb/library/yql/dq/actors/transform/dq_transform_actor_factory.h> +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> + +namespace NYql::NDq { + +void RegisterTransformCloudFunction(TDqTransformActorFactory& factory, IHTTPGateway::TPtr gateway); + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp new file mode 100644 index 0000000000..df3dfe2a9b --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp @@ -0,0 +1,222 @@ +#include "cloud_function_transform.h" + +#include <library/cpp/testing/unittest/registar.h> +#include <library/cpp/actors/testlib/test_runtime.h> + +#include <ydb/library/yql/dq/runtime/dq_output_consumer.h> +#include <ydb/library/yql/minikql/mkql_program_builder.h> +#include <ydb/library/yql/minikql/mkql_function_registry.h> +#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h> +#include <ydb/library/yql/minikql/computation/mkql_value_builder.h> +#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h> +#include <ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h> + +#include <thread> +#include <chrono> + +namespace { + +using namespace NYql; +using namespace NActors; +using namespace NKikimr; +using namespace NMiniKQL; +using namespace NDq; + +using namespace std::chrono_literals; + +struct TTransformSystem { + IDqOutputChannel::TPtr InputChannel; + IDqOutputChannel::TPtr OutputChannel; + TCloudFunctionTransformActor* TransformActor; + NActors::TActorId TransformActorId; + NActors::TActorId ComputeActorId; +}; + +class TCloudFunctionTransformTest: public TTestBase { +public: + UNIT_TEST_SUITE(TCloudFunctionTransformTest); + UNIT_TEST(TestEmptyChannel) + UNIT_TEST(TestMultiplicationTransform) + UNIT_TEST(TestSeveralIterations) + UNIT_TEST_SUITE_END(); + + TCloudFunctionTransformTest() + : Alloc(std::make_unique<TScopedAlloc>(NKikimr::TAlignedPagePoolCounters(), false)) + , TypeEnv(std::make_unique<TTypeEnvironment>(*Alloc)) + , FunctionRegistry(CreateFunctionRegistry(CreateBuiltinRegistry())) + , ProgramBuilder(*TypeEnv, *FunctionRegistry) + , MemInfo("Mem") + , Gateway(IHTTPMockGateway::Make()) + { + HolderFactory = std::make_unique<NMiniKQL::THolderFactory>(Alloc->Ref(), MemInfo); + ValueBuilder = std::make_unique<TDefaultValueBuilder>(*HolderFactory); + } + + TTransformSystem StartUpTransformSystem(ui32 nodeId, TString transformName) { + auto system = TTransformSystem(); + + NDqProto::TDqTransform taskTransform; + + taskTransform.SetType(NDqProto::ETransformType::TRANSFORM_CLOUD_FUNCTION); + taskTransform.SetFunctionName(transformName); + + TStructMember inMembers[] = { + {"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)}, + {"C", TDataType::Create(NUdf::TDataType<ui32>::Id, *TypeEnv)} + }; + NMiniKQL::TType* inputType = TStructType::Create(2, inMembers, *TypeEnv); + taskTransform.SetInputType(NYql::NCommon::WriteTypeToYson(inputType)); + + TStructMember outMembers[] = { + {"A", TDataType::Create(NUdf::TDataType<bool>::Id, *TypeEnv)}, + {"C", TDataType::Create(NUdf::TDataType<ui64>::Id, *TypeEnv)} + }; + NMiniKQL::TType* outputType = TStructType::Create(2, outMembers, *TypeEnv); + taskTransform.SetOutputType(NYql::NCommon::WriteTypeToYson(outputType)); + + TDqOutputChannelSettings settings; + settings.MaxStoredBytes = 10000; + settings.MaxChunkBytes = 2_MB; + settings.TransportVersion = NDqProto::DATA_TRANSPORT_UV_PICKLE_1_0; + settings.CollectProfileStats = false; + settings.AllowGeneratorsInUnboxedValues = false; + + TLogFunc logger = [](const TString& message) { + Y_UNUSED(message); + }; + + system.InputChannel = CreateDqOutputChannel(nodeId + 100, inputType, *TypeEnv, *HolderFactory, settings, logger); + system.OutputChannel = CreateDqOutputChannel(nodeId + 400, outputType, *TypeEnv, *HolderFactory, settings, logger); + auto outputConsumer = CreateOutputMapConsumer(system.OutputChannel); + + system.ComputeActorId = ActorRuntime_->AllocateEdgeActor(); + system.TransformActor = new TCloudFunctionTransformActor(system.ComputeActorId, + taskTransform, Gateway, + system.InputChannel, outputConsumer, + *HolderFactory, *TypeEnv, ProgramBuilder); + system.TransformActorId = ActorRuntime_->Register(system.TransformActor, nodeId); + + return system; + } + + void SetUp() override { + const ui32 nodesNumber = 6; + ActorRuntime_.Reset(new NActors::TTestActorRuntimeBase(nodesNumber, false)); + ActorRuntime_->Initialize(); + } + + void TearDown() override { + ActorRuntime_.Reset(); + } + + void TestEmptyChannel() { + auto context = StartUpTransformSystem(0, "emptyChannel"); + + context.InputChannel->Finish(); + auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform(); + ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv)); + + auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1)); + + UNIT_ASSERT(event); + UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished()); + } + + + void TestMultiplicationTransform() { + auto context = StartUpTransformSystem(0, "multiplyC"); + + IHTTPMockGateway::TDataResponse multiplyCResponse = []() { + return IHTTPGateway::TContent("[{\"A\":true,\"C\":1110}]", 200); + }; + + auto url = "https://functions.yandexcloud.net/multiplyC"; + auto headers = IHTTPGateway::THeaders(); + auto postBody = "[{\"A\":\"true\",\"C\":\"555\"}]"; + Gateway->AddDownloadResponse(url, headers, postBody, multiplyCResponse); + + NUdf::TUnboxedValue* items; + auto value = ValueBuilder->NewArray(2, items); + items[0] = NUdf::TUnboxedValuePod(true); + items[1] = NUdf::TUnboxedValuePod(ui32(555)); + + context.InputChannel->Push(std::move(value)); + context.InputChannel->Finish(); + + auto executeEv = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform(); + ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv)); + auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1)); + + UNIT_ASSERT(event); + NKikimr::NMiniKQL::TUnboxedValueVector transformedRows; + context.OutputChannel->PopAll(transformedRows); + UNIT_ASSERT_VALUES_EQUAL(transformedRows.size(), 1); + UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished()); + transformedRows.clear(); + } + + void TestSeveralIterations() { + auto context = StartUpTransformSystem(0, "multiplyCBatch"); + + IHTTPMockGateway::TDataResponse multiplyCResponse1 = []() { + return IHTTPGateway::TContent("[{\"A\":true,\"C\":860}]", 200); + }; + + IHTTPMockGateway::TDataResponse multiplyCResponse2 = []() { + return IHTTPGateway::TContent("[{\"A\":false,\"C\":800}]", 200); + }; + + auto url = "https://functions.yandexcloud.net/multiplyCBatch"; + auto headers = IHTTPGateway::THeaders(); + auto postBody1 = "[{\"A\":\"true\",\"C\":\"430\"}]"; + Gateway->AddDownloadResponse(url, headers, postBody1, multiplyCResponse1); + auto postBody2 = "[{\"A\":\"false\",\"C\":\"800\"}]"; + Gateway->AddDownloadResponse(url, headers, postBody2, multiplyCResponse2); + + NUdf::TUnboxedValue* items1; + auto value1 = ValueBuilder->NewArray(2, items1); + items1[0] = NUdf::TUnboxedValuePod(true); + items1[1] = NUdf::TUnboxedValuePod(ui32(430)); + context.InputChannel->Push(std::move(value1)); + auto executeEv1 = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform(); + ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv1)); + + ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformNewData>(context.ComputeActorId, TDuration::MilliSeconds(500)); + + NUdf::TUnboxedValue* items2; + auto value2 = ValueBuilder->NewArray(2, items2); + items2[0] = NUdf::TUnboxedValuePod(false); + items2[1] = NUdf::TUnboxedValuePod(ui32(800)); + context.InputChannel->Push(std::move(value2)); + context.InputChannel->Finish(); + + auto executeEv2 = new TCloudFunctionTransformActor::TCFTransformEvent::TEvExecuteTransform(); + ActorRuntime_->Send(new IEventHandle(context.TransformActorId, context.ComputeActorId, executeEv2)); + + auto event = ActorRuntime_->GrabEdgeEvent<NTransformActor::TEvTransformCompleted>(context.ComputeActorId, TDuration::Seconds(1)); + + UNIT_ASSERT(event); + NKikimr::NMiniKQL::TUnboxedValueVector transformedRows; + context.OutputChannel->PopAll(transformedRows); + UNIT_ASSERT_VALUES_EQUAL(transformedRows.size(), 2); + UNIT_ASSERT_VALUES_EQUAL(true, context.OutputChannel->IsFinished()); + transformedRows.clear(); + } + +private: + THolder<NActors::TTestActorRuntimeBase> ActorRuntime_; + + std::unique_ptr<NMiniKQL::TScopedAlloc> Alloc; + std::unique_ptr<NMiniKQL::TTypeEnvironment> TypeEnv; + + TIntrusivePtr<IFunctionRegistry> FunctionRegistry; + NMiniKQL::TProgramBuilder ProgramBuilder; + NMiniKQL::TMemoryUsageInfo MemInfo; + std::unique_ptr<NMiniKQL::THolderFactory> HolderFactory; + + std::unique_ptr<TDefaultValueBuilder> ValueBuilder; + IHTTPMockGateway::TPtr Gateway; +}; + +UNIT_TEST_SUITE_REGISTRATION(TCloudFunctionTransformTest) +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt new file mode 100644 index 0000000000..d3fffe049d --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.darwin.txt @@ -0,0 +1,55 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-yql-providers-cloud_function-actors-ut) +target_compile_options(library-yql-providers-cloud_function-actors-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(library-yql-providers-cloud_function-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors +) +target_link_libraries(library-yql-providers-cloud_function-actors-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-cloud_function-actors + cpp-actors-testlib + yql-minikql-invoke_builtins + udf-service-exception_policy + yql-minikql-computation + providers-common-http_gateway + common-http_gateway-mock +) +target_sources(library-yql-providers-cloud_function-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp +) +add_test( + NAME + library-yql-providers-cloud_function-actors-ut + COMMAND + library-yql-providers-cloud_function-actors-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +target_link_flags(library-yql-providers-cloud_function-actors-ut + PUBLIC + -Wl,-no_deduplicate + -Wl,-sdk_version,10.15 + -fPIC + -fPIC + -framework + CoreFoundation + -framework + CoreFoundation +) +vcs_info(library-yql-providers-cloud_function-actors-ut) diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt new file mode 100644 index 0000000000..c159a94b38 --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.linux.txt @@ -0,0 +1,56 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(library-yql-providers-cloud_function-actors-ut) +target_compile_options(library-yql-providers-cloud_function-actors-ut PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_include_directories(library-yql-providers-cloud_function-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors +) +target_link_libraries(library-yql-providers-cloud_function-actors-ut PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-lfalloc + library-cpp-cpuid_check + cpp-testing-unittest_main + providers-cloud_function-actors + cpp-actors-testlib + yql-minikql-invoke_builtins + udf-service-exception_policy + yql-minikql-computation + providers-common-http_gateway + common-http_gateway-mock +) +target_sources(library-yql-providers-cloud_function-actors-ut PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/cloud_function/actors/cloud_function_transform_ut.cpp +) +add_test( + NAME + library-yql-providers-cloud_function-actors-ut + COMMAND + library-yql-providers-cloud_function-actors-ut + --print-before-suite + --print-before-test + --fork-tests + --print-times + --show-fails +) +target_link_flags(library-yql-providers-cloud_function-actors-ut + PUBLIC + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +vcs_info(library-yql-providers-cloud_function-actors-ut) diff --git a/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt new file mode 100644 index 0000000000..a681d385f3 --- /dev/null +++ b/ydb/library/yql/providers/cloud_function/actors/ut/CMakeLists.txt @@ -0,0 +1,13 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (APPLE) + include(CMakeLists.darwin.txt) +elseif (UNIX) + include(CMakeLists.linux.txt) +endif() diff --git a/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt b/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt new file mode 100644 index 0000000000..2926e538fd --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/mock/CMakeLists.txt @@ -0,0 +1,21 @@ + +# This file was gererated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(common-http_gateway-mock) +target_compile_options(common-http_gateway-mock PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(common-http_gateway-mock PUBLIC + contrib-libs-cxxsupp + yutil + providers-common-http_gateway +) +target_sources(common-http_gateway-mock PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp +) diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp new file mode 100644 index 0000000000..194bf2c6e7 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.cpp @@ -0,0 +1,92 @@ +#include "yql_http_mock_gateway.h" + +#include <ydb/library/yql/utils/yql_panic.h> + +namespace NYql { +namespace { + +using TKeyType = std::tuple<TString, IHTTPGateway::THeaders, TString>; + +class TKeyHash { +public: + TKeyHash() : Hash() {} + + size_t operator()(const TKeyType& key) const { + const auto& headers = std::get<1U>(key); + auto initHash = CombineHashes(Hash(std::get<0U>(key)), Hash(std::get<2U>(key))); + return std::accumulate(headers.cbegin(), headers.cend(), initHash, + [this](size_t hash, const TString& item) { return CombineHashes(hash, Hash(item)); }); + } +public: + const std::hash<TString> Hash; +}; + +class THTTPMockGateway : public IHTTPMockGateway { +friend class IHTTPMockGateway; +public: + using TPtr = std::shared_ptr<THTTPMockGateway>; + + THTTPMockGateway() { + } + + ~THTTPMockGateway() { + } + + void Download( + TString url, + IHTTPGateway::THeaders headers, + std::size_t expectedSize, + IHTTPGateway::TOnResult callback, + TString data, + IRetryPolicy<long>::TPtr retryPolicy) + { + + Y_UNUSED(expectedSize); + Y_UNUSED(retryPolicy); + + auto key = TKeyType(url, headers, data); + if (RequestsResponse.contains(key)) { + for (auto response : RequestsResponse[key]) { + callback(response()); + } + } else if (DefaultResponse) { + callback(DefaultResponse(url, headers, data)); + } else { + YQL_ENSURE(false, "There isn't any response callback at url " + url); + } + } + + virtual void Download( + TString , + IHTTPGateway::THeaders , + std::size_t , + IHTTPGateway::TOnNewDataPart , + IHTTPGateway::TOnDowloadFinsh ) { + } + + void AddDefaultResponse(TDataDefaultResponse response) { + DefaultResponse = response; + } + + void AddDownloadResponse( + TString url, + IHTTPGateway::THeaders headers, + TString data, + TDataResponse response) { + + auto& entry = RequestsResponse[TKeyType(url, headers, data)]; + entry.emplace_back(std::move(response)); + } + +private: + std::unordered_map<TKeyType, std::vector<TDataResponse>, TKeyHash> RequestsResponse; + TDataDefaultResponse DefaultResponse; +}; +} + + +IHTTPMockGateway::TPtr IHTTPMockGateway::Make() { + return std::make_shared<THTTPMockGateway>(); +} + +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h new file mode 100644 index 0000000000..447d608384 --- /dev/null +++ b/ydb/library/yql/providers/common/http_gateway/mock/yql_http_mock_gateway.h @@ -0,0 +1,25 @@ +#pragma once + +#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h> + +namespace NYql { +class IHTTPMockGateway : public IHTTPGateway { +public: + using TPtr = std::shared_ptr<IHTTPMockGateway>; + + virtual ~IHTTPMockGateway() = default; + + static TPtr Make(); + + using TDataDefaultResponse = std::function<IHTTPGateway::TResult(TString, IHTTPGateway::THeaders, TString)>; + using TDataResponse = std::function<IHTTPGateway::TResult()>; + + virtual void AddDefaultResponse(TDataDefaultResponse responseCallback) = 0; + + virtual void AddDownloadResponse( + TString url, + IHTTPGateway::THeaders headers, + TString data, + TDataResponse responseCallback) = 0; +}; +}
\ No newline at end of file diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp index ffe7a3e6b3..44f076536a 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp @@ -2,6 +2,7 @@ #include <contrib/libs/curl/include/curl/curl.h> #include <util/stream/str.h> +#include <util/string/builder.h> #include <util/generic/size_literals.h> #include <thread> @@ -112,12 +113,15 @@ private: if (CURLE_OK != result) return Fail(TIssue(curl_easy_strerror(result))); + long httpResponseCode = 0; + curl_easy_getinfo(GetHandle(), CURLINFO_RESPONSE_CODE, &httpResponseCode); + const std::unique_lock lock(SyncCallbacks); while (!Callbacks.empty()) { if (1U == Callbacks.size()) - Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer))); + Callbacks.top()(IHTTPGateway::TContent(std::move(Buffer), httpResponseCode)); else - Callbacks.top()(IHTTPGateway::TContent(Buffer)); + Callbacks.top()(IHTTPGateway::TContent(Buffer, httpResponseCode)); Callbacks.pop(); } } @@ -518,22 +522,32 @@ private: } -IHTTPGateway::TContent::TContent(TString&& data) +IHTTPGateway::TContent::TContent(TString&& data, long httpResponseCode) : TString(std::move(data)) + , HttpResponseCode(httpResponseCode) { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_add(size()); } } -IHTTPGateway::TContent::TContent(const TString& data) +IHTTPGateway::TContent::TContent(const TString& data, long httpResponseCode) : TString(data) + , HttpResponseCode(httpResponseCode) { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_add(size()); } } +IHTTPGateway::TContent::TContent(TString&& data) + : TContent(std::move(data), 0) +{} + +IHTTPGateway::TContent::TContent(const TString& data) + : TContent(data, 0) +{} + TString IHTTPGateway::TContent::Extract() { if (!empty()) { THTTPMultiGateway::OutputSize.fetch_sub(size()); diff --git a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h index 991f0b2768..8872270a09 100644 --- a/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h +++ b/ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h @@ -28,6 +28,8 @@ public: class TContent : private TString { friend class TEasyCurl; public: + TContent(TString&& data, long httpResponseCode); + TContent(const TString& data, long httpResponseCode); TContent(TString&& data); TContent(const TString& data); @@ -44,6 +46,9 @@ public: private: TContent(const TContent&) = delete; TContent& operator=(const TContent&) = delete; + + public: + long HttpResponseCode; }; using TResult = std::variant<TContent, TIssues>; diff --git a/ydb/library/yql/providers/dq/planner/execution_planner.cpp b/ydb/library/yql/providers/dq/planner/execution_planner.cpp index ec404c0a6d..9531a0204b 100644 --- a/ydb/library/yql/providers/dq/planner/execution_planner.cpp +++ b/ydb/library/yql/providers/dq/planner/execution_planner.cpp @@ -359,10 +359,6 @@ namespace NYql::NDqs { FillOutputDesc(*taskDesc.AddOutputs(), output); } - auto& transform = *taskDesc.MutableOutputTransform(); - transform.SetType(task.OutputTransform.Type); - transform.SetFunctionName(task.OutputTransform.FunctionName); - auto& program = *taskDesc.MutableProgram(); program.SetRuntimeVersion(NYql::NDqProto::ERuntimeVersion::RUNTIME_VERSION_YQL_1_0); TString programStr; @@ -479,9 +475,11 @@ namespace NYql::NDqs { task.Inputs[dqSourceInputIndex].SourceSettings = sourceSettings; task.Inputs[dqSourceInputIndex].SourceType = sourceType; } - auto& transform = task.OutputTransform; - transform.Type = stageSettings.TransformType; - transform.FunctionName = stageSettings.TransformName; + if (stageSettings.IsExternalFunction) { + auto& transform = task.OutputTransform; + transform.Type = stageSettings.TransformType; + transform.FunctionName = stageSettings.TransformName; + } } } return !parts.empty(); |