diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-26 15:41:17 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-04-26 15:41:17 +0300 |
commit | b9075f100dc7e5e913fb97c1ae0b6984b1ff5c88 (patch) | |
tree | 0461cb046024ed17b6249a9d8b84b07ec0ccff79 | |
parent | 70eabc9449f88f18556c7839ba6cfbacc69de968 (diff) | |
download | ydb-b9075f100dc7e5e913fb97c1ae0b6984b1ff5c88.tar.gz |
YQL-10818 Make RuntimeJobNode for files with size > 1_MB
ref:c5a8c6b69406b67262ee1e902a4869221e0564c5
-rw-r--r-- | CMakeLists.darwin.txt | 2 | ||||
-rw-r--r-- | CMakeLists.linux.txt | 2 | ||||
-rw-r--r-- | ydb/library/yql/dq/comp_nodes/CMakeLists.txt | 3 | ||||
-rw-r--r-- | ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp | 10 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp | 23 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp | 8 | ||||
-rw-r--r-- | ydb/library/yql/providers/dq/provider/yql_dq_state.h | 3 |
8 files changed, 44 insertions, 8 deletions
diff --git a/CMakeLists.darwin.txt b/CMakeLists.darwin.txt index 6847794e71..1eb4ac1875 100644 --- a/CMakeLists.darwin.txt +++ b/CMakeLists.darwin.txt @@ -621,6 +621,7 @@ add_subdirectory(ydb/library/yql/providers/common/db_id_async_resolver) add_subdirectory(ydb/library/yql/providers/dq/provider) add_subdirectory(library/cpp/threading/task_scheduler) add_subdirectory(ydb/public/lib/yson_value) +add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/common/metrics) add_subdirectory(ydb/library/yql/providers/common/transform) add_subdirectory(ydb/library/yql/providers/dq/api/grpc) @@ -807,7 +808,6 @@ add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) -add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/actors) add_subdirectory(ydb/library/yql/providers/pq/async_io) add_subdirectory(ydb/library/yql/providers/pq/gateway/native) diff --git a/CMakeLists.linux.txt b/CMakeLists.linux.txt index a3ffccb62a..5059d076b1 100644 --- a/CMakeLists.linux.txt +++ b/CMakeLists.linux.txt @@ -701,6 +701,7 @@ add_subdirectory(ydb/library/yql/providers/common/db_id_async_resolver) add_subdirectory(ydb/library/yql/providers/dq/provider) add_subdirectory(library/cpp/threading/task_scheduler) add_subdirectory(ydb/public/lib/yson_value) +add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/common/metrics) add_subdirectory(ydb/library/yql/providers/common/transform) add_subdirectory(ydb/library/yql/providers/dq/api/grpc) @@ -887,7 +888,6 @@ add_subdirectory(ydb/core/yq/libs/health) add_subdirectory(ydb/public/sdk/cpp/client/ydb_discovery) add_subdirectory(ydb/core/yq/libs/test_connection) add_subdirectory(ydb/core/yq/libs/test_connection/events) -add_subdirectory(ydb/library/yql/dq/comp_nodes) add_subdirectory(ydb/library/yql/providers/clickhouse/actors) add_subdirectory(ydb/library/yql/providers/pq/async_io) add_subdirectory(ydb/library/yql/providers/pq/gateway/native) diff --git a/ydb/library/yql/dq/comp_nodes/CMakeLists.txt b/ydb/library/yql/dq/comp_nodes/CMakeLists.txt index 60664b75c5..257099d085 100644 --- a/ydb/library/yql/dq/comp_nodes/CMakeLists.txt +++ b/ydb/library/yql/dq/comp_nodes/CMakeLists.txt @@ -15,9 +15,10 @@ target_link_libraries(yql-dq-comp_nodes PUBLIC contrib-libs-cxxsupp yutil cpp-actors-core + dq-actors-compute yql-minikql-computation library-yql-minikql - dq-actors-compute + library-yql-utils ) target_sources(yql-dq-comp_nodes PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/comp_nodes/yql_common_dq_factory.cpp diff --git a/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp b/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp index d4a4b07a6a..89940a2d40 100644 --- a/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp +++ b/ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.cpp @@ -3,8 +3,10 @@ #include <ydb/library/yql/minikql/mkql_function_registry.h> #include <ydb/library/yql/minikql/mkql_program_builder.h> #include <ydb/library/yql/minikql/mkql_node_cast.h> +#include <ydb/library/yql/utils/yql_panic.h> #include <library/cpp/actors/core/actor.h> +#include <util/stream/file.h> namespace NYql { @@ -26,6 +28,14 @@ public: return pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(selfId.ToString()); }; } + if (name == "FileContentJob") { + return [](TCallable& callable, const TTypeEnvironment& env) { + YQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 argument"); + const TString path(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); + auto content = TFileInput(path).ReadAll(); + return TRuntimeNode(BuildDataLiteral(content, NUdf::TDataType<char*>::Id, env), true); + }; + } return TCallableVisitFunc(); } diff --git a/ydb/library/yql/providers/dq/provider/CMakeLists.txt b/ydb/library/yql/providers/dq/provider/CMakeLists.txt index 990310699b..03bddd9f96 100644 --- a/ydb/library/yql/providers/dq/provider/CMakeLists.txt +++ b/ydb/library/yql/providers/dq/provider/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(providers-dq-provider PUBLIC public-lib-yson_value cpp-client-ydb_driver library-yql-core + yql-dq-comp_nodes yql-dq-tasks yql-dq-type_ann providers-common-gateway diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 7fcc73768b..95857b7d4d 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -124,6 +124,7 @@ public: executionContext.ComputationFactory = State->ComputationFactory; executionContext.RandomProvider = randomProvider.Get(); executionContext.TimeProvider = timeProvider.Get(); + executionContext.FuncProvider = State->TransformFactory({}, executionContext.FuncRegistry); NDq::TDqTaskRunnerMemoryLimits limits; limits.ChannelBufferSize = 10_MB; @@ -379,13 +380,23 @@ private: const TProgramBuilder pgmBuilder(typeEnv, *State->FunctionRegistry); TRuntimeNode result; + bool doUpload = false; switch (block->Type) { case EUserDataType::URL: case EUserDataType::PATH: { - TString content = (name == TStringBuf("FilePath")) - ? fullFileName - : TFileInput(block->FrozenFile->GetPath()).ReadAll(); - result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content); + TString content = fullFileName; + if (name == TStringBuf("FileContent")) { + if (block->FrozenFile->GetSize() < MaxFileReadSize) { + content = TFileInput(block->FrozenFile->GetPath()).ReadAll(); + } else { + TCallableBuilder builder(typeEnv, TStringBuf("FileContentJob"), callable.GetType()->GetReturnType(), false); + builder.Add(pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(fullFileName)); + result = TRuntimeNode(builder.Build(), false); + doUpload = true; + } + } + if (!result) + result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(content); break; } case EUserDataType::RAW_INLINE_DATA: { @@ -402,7 +413,7 @@ private: if (result.GetNode() != node) { callable.SetResult(result, typeEnv); } - if (name == TStringBuf("FilePath")) { + if (name == TStringBuf("FilePath") || doUpload) { // filePath, fileName, md5 auto f = IDqGateway::TFileResource(); f.SetLocalPath(filePath); @@ -1257,6 +1268,8 @@ private: THolder<IGraphTransformer> DqTypeAnnotationTransformer; mutable THashMap<TString, TFileLinkPtr> FileLinks; mutable THashMap<TString, TString> ModulesMapping; + + const ui64 MaxFileReadSize = 1_MB; }; } diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp index 69788c5c76..cc75204159 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_provider.cpp @@ -8,6 +8,9 @@ #include <ydb/library/yql/providers/common/provider/yql_provider.h> #include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/dq/interface/yql_dq_task_transform.h> +#include <ydb/library/yql/dq/comp_nodes/yql_common_dq_transform.h> + #include <ydb/library/yql/utils/log/log.h> namespace NYql { @@ -32,11 +35,16 @@ TDataProviderInitializer GetDqDataProviderInitializer( ) { Y_UNUSED(userName); + auto dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ + NYql::CreateCommonDqTaskTransformFactory() + }); + TDqStatePtr state = MakeIntrusive<TDqState>( dqGateway, // nullptr for yqlrun gatewaysConfig, functionRegistry, compFactory, + dqTaskTransformFactory, randomProvider, typeCtx.Get(), progressWriter, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_state.h b/ydb/library/yql/providers/dq/provider/yql_dq_state.h index a3e54dcb11..a3fcaf1dbd 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_state.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_state.h @@ -18,6 +18,7 @@ struct TDqState: public TThrRefBase { const TGatewaysConfig* GatewaysConfig; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry; NKikimr::NMiniKQL::TComputationNodeFactory ComputationFactory; + TTaskTransformFactory TransformFactory; TIntrusivePtr<IRandomProvider> RandomProvider; TTypeAnnotationContext* TypeCtx; const TOperationProgressWriter ProgressWriter; @@ -40,6 +41,7 @@ struct TDqState: public TThrRefBase { const TGatewaysConfig* gatewaysConfig, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, NKikimr::NMiniKQL::TComputationNodeFactory compFactory, + TTaskTransformFactory transformFactory, const TIntrusivePtr<IRandomProvider>& randomProvider, TTypeAnnotationContext* typeCtx, const TOperationProgressWriter& progressWriter, @@ -54,6 +56,7 @@ struct TDqState: public TThrRefBase { , GatewaysConfig(gatewaysConfig) , FunctionRegistry(functionRegistry) , ComputationFactory(compFactory) + , TransformFactory(transformFactory) , RandomProvider(randomProvider) , TypeCtx(typeCtx) , ProgressWriter(progressWriter) |